One of the most tedious things when working with data has to be processing times. Sometimes your data is not big enough and this is never an issue. Other times, your data is so big that you sit… and wait… until a Pandas process or whatever other process you’re trying to run finishes running. 2 hours? 28 hours? A quick and dirty way to avoid this would be to only study a subset of your data; however, sometimes that is not possible and you must look at your data as a whole and you realize your tools have limits in what they can do. Not all hope is lost though, the way to approach this problem is to use parallel programming. Parallel what? Let’s talk definitions.

Parallel programming are multiple computations being carried out simultaneously. In order to achieve this, we need to take advantage of our CPU and the fact that modern computers have at least a dual core so we can split and distribute our data and tasks among all of them cutting the time spent in each significantly. If this still doesn’t make sense, imagine having to fill 100 jars with 1000 jelly beans each. For one worker, this would take a lot of time, but what if you had 2, 4, 10 workers doing the same task simultaneously? That’s how parallel computing works. The more CPU cores we use the more we can accelerate our computations. Understanding how parallel programming works and how to implement is an incredibly useful tool to have in your belt, specially if you work with data.

CPU

Python has a few choices to accomplish this.  Here, I will discuss Dask and multiprocessing. These two packages work on the same principle: divide and conquer. multiprocessing is a pre-built package and; hence, it is more robust and will let you have more control over your parameters, how you’re splitting the data, the number of workers you want, etc, while Dask takes care of all of that for you.

Here‘s the dataset we will use for this example (UN General Debates Text Data). It is not a massive dataset by any means, but for this tutorial’s purposes it will show much time we can save with multiprocessing when you have a dataset that is 10 times as big.

Let’s import our libraries…
If you don’t have dask, install it as so: pip install “dask[complete]”

#---Import libraries
import pandas as pd
import multiprocessing as mp
import dask.dataframe as dd
import dask.array as da
import numpy as np
from multiprocessing import Pool

Using Parallel Programming with Arrays

Let’s now create a simple function to understand how parallel programming works and how to execute it with dask and multiprocessing.

def simple_sum(x, y):
    #return sum of two numbers
    add = x+y
    return add
a = np.arange(2000) #create two arrays
b = np.arange(2000)

In order to use the multiprocessing library, we need to define how many workers we want. Usually, you want to pick twice the number of cores your computer has. In my case, my number of workers would be 4.

NUM_PROCESSES = 4
pool = mp.Pool(processes=NUM_PROCESSES)

We will use the pool class from multiprocessing. Be sure to go over the docs to learn about other methods in this class. The two most likely methods you will use are apply (apply function to object) and map (apply function to every item of iterable and return a list of the results).

With multiprocessing

%time add_sum = pool.apply(simple_sum, args=(a,b))

CPU times: user 781 µs, sys: 1.2 ms, total: 1.98 ms
Wall time: 1.71 ms

With dask,

#we first need to convert to dask objects
a = da.from_array(a, chunks=(100))
b = da.from_array(b, chunks=(100))

%time add_sum = simple_sum(a,b)

CPU times: user 1.34 ms, sys: 743 µs, total: 2.08 ms
Wall time: 2.21 ms

Without parallel programming,

%time add_sum =simple_sum(a,b)


CPU times: user 690 µs, sys: 2.41 ms, total: 3.1 ms
Wall time: 3.15 ms

As you can see, running this in parallel cut time by quite a bit. Let’s try looking at this with actual data.

Using Parallel Programming with Data

Let’s begin by using dask. Dask actually incorporated most of the methods you’d use with a pandas dataframe so it is very straight forward to use.

def clean_text(row):
    #Simple function to remove number from each row in column. Takes a string
    clean = ''.join(i for i in row if not i.isdigit())
    return clean

ddf = dd.from_pandas(pd.read_csv("un-general-debates.csv", encoding='utf-8'), chunksize=25)

%time df['cleaned'] = df['text'].apply(clean_text)
%time ddf['cleaned'] = ddf['text'].apply(clean_text)

Without dask: CPU times: user 35.3 s, sys: 3.85 s, total: 39.1 s
Wall time: 36.6 s

With dask: CPU times: user 9.54 ms, sys: 4.11 ms, total: 13.7 ms
Wall time: 13.1 ms

That’s pretty sweet.

Let’s implement this using multiprocessing. Since pandas doesn’t really have a parallel method, I will read the file in chunks and then concatenate it and then use multiprocessing methods.

#---set up workers
NUM_PARTITIONS = 8 #number of data partitions
NUM_PROCESSES = 4 #number of workers
def parallel_df_computation(df, f):
    chunkDf = np.array_split(df, NUM_PARTITIONS)
    pool = Pool(NUM_PROCESSES)
    df = pd.concat(pool.map(f, chunkDf))
    pool.close()
    pool.join()
    return df

def clean_text_v2(data):
    #Simple function to remove number from column. Takes a dataframe
    data = data['text'].str.replace('\d+', '')
    return data

chunks = pd.read_csv('un-general-debates.csv',chunksize=100) #read df in chunks
dfM = pd.DataFrame()
dfM = pd.concat(chunks)
%time dfM['cleaned'] = parallel_df_computation(dfM, clean_textv2)

With multiprocessing: CPU times: user 251 ms, sys: 334 ms, total: 586 ms
Wall time: 2.48 s

Also, very fast…and there you have it! Hope these techniques help with your code efficiency!

Appendix

import pandas as pd
import multiprocessing as mp
import dask.dataframe as dd
import dask.array as da
import numpy as np
from multiprocessing import Pool

#---Arrays
#--Simple arithmetic
def simple_sum(x, y):
    add = x+y
    return add

NUM_PROCESSES = 4
pool = mp.Pool(processes=NUM_PROCESSES)

#Normal
a = np.arange(2000) #create two arrays
b = np.arange(2000)
%time add_sum = simple_sum(a,b)

#Multiprocessing
%time add_sum = pool.apply(simple_sum, args=(a,b))

#Dask
a = da.from_array(a, chunks=(100))
b = da.from_array(b, chunks=(100))
%time add_sum = simple_sum(a,b)

#---Data
def clean_text(row):
    #Simple function to remove number from each row in column. Takes a string
    clean = ''.join(i for i in row if not i.isdigit())
    return clean
def parallel_df_computation(df, f):
    chunkDf = np.array_split(df, NUM_PARTITIONS)
    pool = Pool(NUM_PROCESSES)
    df = pd.concat(pool.map(f, chunkDf))
    pool.close()
    pool.join()
    return df

def clean_text_v2(data):
    #Simple function to remove number from column. Takes a dataframe
    data = data['text'].str.replace('\d+', '')
    return data

#Dask
ddf = dd.from_pandas(pd.read_csv("un-general-debates.csv", encoding='utf-8'), chunksize=25)
%time ddf['cleaned'] = ddf['text'].apply(clean_text)

#No PP
df = df.read_csv("un-general-debates.csv")
%time df['cleaned'] = df['text'].apply(clean_text)
#Multiprocessing
NUM_PARTITIONS = 8 #number of data partitions
NUM_PROCESSES = 4 #number of workers

chunks = pd.read_csv('un-general-debates.csv',chunksize=100) #read df in chunks
dfM = pd.DataFrame()
dfM = pd.concat(chunks)
%time dfM['cleaned'] = parallel_df_computation(dfM, clean_textv2) #multiprocessing PP

 

Posted by:Aisha Pectyo

Astrophysicist turned data rockstar who speaks code and has enough yarn and mod podge to survive a zombie apocalypse.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s