Parallel computing with Python

Nowadays, most computers have more than one physical CPU such that they can compute things in parallel. Even your mobile phones have multiple cores. In an ideal world, $N$ CPUs allow for a speed-up of a factor of $N$ of (certain) computations.

In big computing clusters, certain computations can even run on hundreds to thousands of CPUs simultaneously. For example, the Illustris TNG simulations of structure formation in the Cosmos (you already worked with data of the predecessor, the Illustris project) utilised up to 24,000 cores over nearly 2 years (in total almost 150 million CPU hours), to finish one of the largest scientific computations ever. The simulations run on the Hazel Hen Supercomputer in Stuttgart and used the Arepo code that has been developed by Volker Springel and his group (now at MPA in Garching, before here in Heidelberg at the Centre for Astronomy).

Of course, we will not dive into such high-performance computing here, but Python has modules that allow us to parallelise certain computations. in particular, we will now look into the multiprocessing package.

In [ ]:
import multiprocessing as mp

First of all, how many cores does our machine actually have? To this end, we can use the cpu_count() function of multiprocessing.

In [ ]:
print('Number of processors:', mp.cpu_count())

So this is the maximum number of parallel processes our machine can handle. Note however, the above CPU count may also includes logical CPUs if hyper-threading is available and enabled. The psutil package can distinguish and delivers the number of physical cores. Hyper-threading may further boost the computations but that depends on the jobs. Sometimes, it can even slow computations down.

In [ ]:
import psutil

n_cpus = psutil.cpu_count(logical = False)
print('The number of physical cores is:', n_cpus)

A simple example

First, we will use the Pool object of multiprocessing to parallelise the computation of the cubes of a set of numbers. Such situations are trivially to parallelise because we can simply divide the whole workload into smaller chunks and assign the chunks to the different CPUs of our machine.

We will now implement one solution that uses the Pool.apply_async() functionality of Pool. Alternative methods are, e.g., Pool.apply() and Pool.map() and there exist more. We won't have time to go through the differences of these methods here, so if you want to learn more, we recommend to have a look at the multiprocessing documentation.

First of all, we have to define a function that will be executed by the different CPUs. Here, this functions takes a number and returns the original number and its cube (we have added some "sleep" time here because the computation itself does not take that long...):

In [ ]:
import time

def cube(x):
    time.sleep(2/x)
    return (x, x**3)

We now set up a Pool object that consists of processes=2 workers. Because we know how many CPUs our machine has, we could also have used pool = mp.Pool(processes=n_cpus).

In [ ]:
pool = mp.Pool(processes=2)

results = []
for x in range(1, 11):
    results.append(pool.apply_async(cube, args=(x,)))
    
for p in results:
    x, res = p.get()
    print(x, res)
    
pool.close()
pool.join()

So how does this code work? As mentioned above, we have set up a pool of 2 workers via pool = mp.Pool(processes=2) to which we now can send computation jobs. The jobs consist of evaluating the function cube given the function arguments args=(x,) and append the return-values of cube to the results list. This is done asynchronously here, i.e. whenever a result is ready, it can be returned. The order of the job execution does not matter in our case.

The results of the workers itself are returned with the get() function. The results come in whenever they are ready such that the order of job submission and results must not be the same.

After requesting the results, we also close the pool with pool.close() such that no new jobs can be submitted to our workers. Also, we call pool.join() which essentially tells Python to wait until all jobs are done.

We have seen that it is not that difficult to parallelise simple tasks. We have to be more careful when programming the code and the code to write will be longer, but we are rewarded with faster computations that fully utilise the capabilities of modern computers.

During daily scientific work, speeding up certain computations can help a lot. Instead of waiting for hours for one result, one might be able to obtain it within minutes!

A slightly more complicated example - decoding md5

Let us now try to "crack" a md5 hash that is often used to store passwords. Note that you should never use md5 to encrypt passwords for storage - it is extremely weak as we will see below.

In our example, we only consider strings of up to 4 letters that contain only lower-case, alphanumerical characters. The md5 hash to crack is:

In [ ]:
hash_to_crack = '31d4541b8e926a24f0c9b835b68cfdf3'

We will use a brute-force method that will illustrate why parallel computing can be efficient. To this end, we will create all possible 4-character long strings, encrypt them with md5 and compare the resulting hash to hash_to_crack until we found the correct match.

To speed up the computations will launch a set of workers that will then do the encryption and comparison in parallel. This is a trivial parallelisation because the individual workers do not need to communicate with each other. Such simple jobs can also be parallelised efficiently on GPUs instead of CPUs but we will not go into this here.

For starters, let's see how to generate a md5 hash in Python. This is best done with the hashlib package:

In [ ]:
import hashlib

def get_md5_hash(s):
    return hashlib.md5(s.encode('utf-8')).hexdigest()

print(get_md5_hash('test'))

We now define the allowed characters of the string, which are lower-case, alphanumeric letters using the ord and chr functions that you encountered on Day 1:

In [ ]:
alphabet = [''] # add empty string (also ensures we probe all strings up to length 4)
for i in range(ord('a'), ord('z')+1):
    alphabet.append(chr(i))

print(alphabet)

(Note that we have added an empty char to the list. Why?)

We will now use different methods of the multiprocessing package than in the simple example above, namely the Manager and Queue objects. This is to illustrate more clearly how one can think of parallelisation, and also to show different capabilities of multiprocessing.

Our strategy is to launch a Manager that divides the work into individual jobs and sends them to Workers. As before, we have to define what each Worker has to do:

def do_work(in_queue, out_list):
    while (True):
        s = in_queue.get()

        # capture exit signal
        if (s == None):
            return

        # do work
        h_s = get_md5_hash(s)
        if (h_s == hash_to_crack):
            out_list.append(s)

Here, the Workers take a string from a list called in_queue provided by the Manager, compute its md5 hash, compare it to the hash we want to decode and report to the out_list if a matching string is found. Also, the Manager can send a signal to the Workers that work is over and that they "can go home" (here, this happens if the string retrieved from in_queue is None, but we could also have chosen Feierabend...). Moreover, once one worker has found a match, all Workers have done their job and can stop (that is when the out_list is no longer empty because we have found one solution; note that any objects appended to out_list by one Worker are automatically also available to all other Workers).

We now have to initialise the Manager, the Workers and let the Manager distribute the work, just as in the first parallelisation example above. We will also compute the time it takes the program to decode the hash via the time.time() function of the time package. On Unix systems, this function returns the time in seconds since January 1, 1970, 00:00:00 (UTC). Other operation systems use different reference points.

In [ ]:
from itertools import product

def do_work(in_queue, out_list):
    while (True):
        s = in_queue.get()

        # capture exit signal
        if (s == None):
            return

        # do work
        time.sleep(0.0005)
        h_s = get_md5_hash(s)
        if (h_s == hash_to_crack):
            out_list.append(s)
            
start = time.time()

num_workers = 4

manager = mp.Manager()
results = manager.list()
work = manager.Queue(num_workers)

# initialise pool of workers    
pool = []
for i in range(num_workers):
    p = mp.Process(target=do_work, args=(work, results))
    p.start()
    pool.append(p)

# split work
for c1, c2, c3, c4 in product(alphabet, repeat=4):
    s = c1+c2+c3+c4
    work.put(s)

    # stop sending jobs if a solution was found
    if (results):
        break

# send workers the exit signal
for i in range(num_workers):
    work.put(None)

# wait for all workers to finish
for p in pool:
    p.join()
    
end = time.time()

print('The computation took {:.2f} seconds'.format(end-start))

if (results):
    print('We found the plain text! It is "{:s}". Wow, this was indeed fast!".'.format(results[0]))
else:
    print('We could not find a solution. :(')

Exercise 1: Go through the code line-by-line and try to understand what it does. Why did we add the empty character '' to alphabet above? How could you extend the code such that also hashes originally generated from strings containing upper-case letters, numbers and special symbols can be decoded?

Hint: In the above code, we used the itertools.product method to make our code more readable. To understand better what this loop structure does, we could have written it also as:

for c1 in alphabet:
    for c2 in alphabet:
        for c3 in alphabet:
            for c4 in alphabet:
                s = c1+c2+c3+c4
                ...

It should also be clear from this example that the best passwords are actually long passwords. In fact, it does not matter so much how many different characters you use (e.g. in alphabet above) but the length is much more important because this means brute-force methods have to check too many possible combinations which at some point is no longer feasible.

It should be pointed out that the above code is not very efficient. We are merely able to test 10,000 hashes per second on 2 CPUs. Still, grouped with a good dictionary instead of the random generation of strings, we could test all words of a standard English dictionary (~200,000 words) in about 20 seconds. Optimised codes on GPUs can probe a few billion hashes per second...

Final remark: Just type the hash_to_crack into Google and see what happens... If you have not yet been convinced, you should be now: md5 is a really bad idea to encrypt sensitive information for storage (such as passwords). The methods is (a) cryptographically no longer secure and (b) way too fast such that brute-force attacks are possible. Salting) certainly helps but one should nevertheless use "slower" hash functions such as bcrypt to store passwords (and always at least use a salt!).