Creating scalable NLP pipelines using PySpark and Nlphose

In this article we will see how we can use Nlphose along with Pyspark to execute a NLP pipeline and gather information about the famous journey from Jules Verne’s book ‘Around the World in 80 days‘. Here is the link to the ⬇️ Pyspark notebook used in this article.


From my personal experience what I have found is data mining from unstructured data requires use of multiple techniques. There is no single model or library that typically offers everything you need. Often you may need to use components written in different programing languages/frameworks. This is where my open source project Nlphose comes into picture. Nlphose enables creation of complex NLP pipelines in seconds, for processing static files or streaming text, using a set of simple command line tools.You can perform multiple operation on text like NER, Sentiment Analysis, Chunking, Language Identification, Q&A, 0-shot Classification and more by executing a single command in the terminal. Spark is widely used big data processing tool, which can be used to parallelize workloads.

Nlphose is based on the ‘Unix tools philosophy‘. The idea is to create simple tools which can work together to accomplish multiple tasks. Nlphose scripts rely on standard ‘Filestreams’ to read and write data and can be piped together to create complex natural language processing pipelines. Every scripts reads JSON from ‘Standard Input’ and writes to ‘Standard Output’. All the script expect JSON to be encoded in nlphose compliant format and the output also needs to be nlphose compliant. You can read more details about the architecture of Nlphose here.

How is Nlphose different ?

Nlphose by design does not support Spark/Pyspark natively like SparkML. Nlphose in Pyspark relies on ‘Docker’ being installed on all nodes of the Spark cluster. This is easy to do when creating a new cluster in Google Dataproc (and should be same for any other Spark distribution). Apart from Docker Nlphose does not require any other dependency to be installed on worker nodes of the Spark cluster. We use the ‘PipeLineExecutor’ class as described below to execute Nlphose pipeline from within Pyspark.

Under the hood, this class uses the ‘subprocess‘ module to spawn a new docker container for a Spark task and executes a Nlphose pipeline. I/O is performed using stdout and stdin. This sounds very unsophisticated but that’s how I build Nlphose. It was never envisioned to support any specific computing framework or library. You can run Nlphose in many different ways, one of which is being described here.

Let’s begin

First we install a package which we will use later to construct a visualization.

!pip install wordcount

The below command downloads the ebook ‘Around the world in 80 days’ from gutenber.org and uses a utility provided by Nlphose to segment the single text file into line delimited json. Each json object has an ‘id’ and ‘text’ attribute.

!docker run code2k13/nlphose:latest \
/bin/bash -c "wget https://www.gutenberg.org/files/103/103-0.txt && ./file2json.py 103-0.txt -n 2" > ebook.json

We delete all docker containers which we no longer need.

!docker system prune -f

Let’s import all the required libraries. Read the json file which we created earlier and convert it into a pandas data frame. Then we append a ‘group_id’ column which assigns a random groupId from 0 to 3, to the rows. Once that is done we create a new PySpark dataframe and display some results.

import pandas as pd
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import udf

df_pd = pd.read_json("ebook.json",lines=True)
df_pd['group_id'] = [i for i in range(0,3)]*347
df= spark.createDataFrame(df_pd)
df= spark.createDataFrame(df_pd)
+---------+--------------------+--------------------+--------+
|file_name| id| text|group_id|
+---------+--------------------+--------------------+--------+
|103-0.txt|2bbbfe64-7c1e-11e...|The Project Gute...| 0|
|103-0.txt|2bbea7ea-7c1e-11e...| Title: Around th...| 1|
|103-0.txt|2bbf2eb8-7c1e-11e...|IN WHICH PHILEAS ...| 2|
|103-0.txt|2bbfdbd8-7c1e-11e...| Certainly an Eng...| 0|
|103-0.txt|2bbff29e-7c1e-11e...| Phileas Fogg was...| 1|
|103-0.txt|2bc00734-7c1e-11e...| The way in which...| 2|
|103-0.txt|2bc02570-7c1e-11e...| He was recommend...| 0|
|103-0.txt|2bc095f0-7c1e-11e...| Was Phileas Fogg...| 1|
|103-0.txt|2bc0ed20-7c1e-11e...| Had he travelled...| 2|
|103-0.txt|2bc159d6-7c1e-11e...| It was at least ...| 0|
|103-0.txt|2bc1a3be-7c1e-11e...| Phileas Fogg was...| 1|
|103-0.txt|2bc2a2aa-7c1e-11e...|He breakfasted an...| 2|
|103-0.txt|2bc2c280-7c1e-11e...| If to live in th...| 0|
|103-0.txt|2bc30b3c-7c1e-11e...| The mansion in S...| 1|
|103-0.txt|2bc34dd6-7c1e-11e...| Phileas Fogg was...| 2|
|103-0.txt|2bc35f88-7c1e-11e...|Fogg would, accor...| 0|
|103-0.txt|2bc3772a-7c1e-11e...| A rap at this mo...| 1|
|103-0.txt|2bc3818e-7c1e-11e...| “The new servant...| 2|
|103-0.txt|2bc38e0e-7c1e-11e...| A young man of t...| 0|
|103-0.txt|2bc45c6c-7c1e-11e...| “You are a Frenc...| 1|
+---------+--------------------+--------------------+--------+

Running the nlphose pipeline using Pyspark

As discussed earlier Nlphose does not have native integration with Pyspark/Spark. So we create a class called ‘PipeLineExecutor’ that starts a docker container and executes a Nlphose command. This class communicates with the docker container using ‘stdin’ and ‘stdout’. Finally when the docker container completes execution, we execute ‘docker system prune -f’ to clear any unused containers. The ‘execute_pipeline’ method writes data from dataframe to stdin (line by line), reads output from stdout and returns a dataframe created from the output.

import subprocess
import pandas as pd
import json

class PipeLineExecutor:
def __init__(self, nlphose_command,data,id_column='id',text_column='text'):
self.nlphose_command = nlphose_command
self.id_column = id_column
self.text_column = text_column
self.data = data

def execute_pipeline(self):
try:
prune_proc = subprocess.Popen(["docker system prune -f"],shell=True)
prune_proc.communicate()

proc = subprocess.Popen([self.nlphose_command],shell=True,stdout=subprocess.PIPE, stdin=subprocess.PIPE,stderr=subprocess.PIPE)
for idx,row in self.data.iterrows():
proc.stdin.write(bytes(json.dumps({"id":row[self.id_column],"text":row[self.text_column]}),"utf8"))
proc.stdin.write(b"\n")
proc.stdin.flush()

output,error = proc.communicate()
output_str = str(output,'utf-8')
output_str = output_str
data = output_str.split("\n")
data = [d for d in data if len(d) > 2]
finally:
prune_proc = subprocess.Popen(["docker system prune -f"],shell=True)
prune_proc.communicate()
return pd.DataFrame(data)

The Nlphose command

The below command does multiple things:

  • It starts a docker container using code2k13/nlphose:latest image from dockerhub.
  • It redirects stdin, stdout and stderr of host into docker container.
  • Then it runs a nlphose command inside the docker container which performs below operations on json coming from ‘stdin’ and writes output to ‘stdout’:
command =  '''
docker run -a stdin -a stdout -a stderr -i code2k13/nlphose:latest /bin/bash -c "./entity.py |\
./xformer.py --pipeline question-answering --param 'what did they carry?'
"
'''

The below function formats the data returned by PipelineExecutor task. The dataframe returned by ‘PipelineExecutor.execute_pipeline’ has a string column containing output from the Nlphose command. Each row in the dataframe represents a line/document output from the Nlphose command.

def get_answer(row):
try:
x = json.loads(row[0],strict=False)
row['json_obj'] = json.dumps(x)
if x['xfrmr_question_answering']['score'] > 0.80:
row['id'] = str(x['id'])
row['answer'] = x['xfrmr_question_answering']['answer']
else:
row['id'] = str(x['id'])
row['answer'] = None

except Exception as e:
row['id'] = None
row['answer'] = "ERROR " + str(e) #.message
row['json_obj'] = None

return row

The below function, creates a ‘PipeLineExecutor’ object, passes on data to it and then calls the ‘execute_pipeline’ method on the object. Then it uses the ‘get_answer’ method to format the output of the ‘execute_pipeline’ method.

def run_pipeline(data):
nlphose_executor = PipeLineExecutor(command,data,"id","text")
result = nlphose_executor.execute_pipeline()
result = result.apply(get_answer,axis=1)
return result[["id","answer","json_obj"]]

Scaling the pipeline using PySpark

We use the ‘applyInPandas‘ from PySpark to parallelize and process text at scale. PySpark automatically handles scaling of the Nlphose pipeline on the Spark cluster. The ‘run_pipeline’ method is invoked for every ‘group’ of the input data. It is important to have appropriate number of groups based on number of nodes so as to efficiently process data on the Spark cluster.

output = df.groupby("group_id").applyInPandas(run_pipeline, schema="id string,answer string,json_obj string")
output.cache()

Visualizing our findings

Once we are done executing the nlphose pipeline, we set out to visualize our findings. I have created two visualizations:

  • A map showing places mentioned in the book.
  • A word cloud of all the important items the characters carried for their journey.

Plotting most common locations from the book on world map

The below code extracts latitude and longitude information from Nlphose pipeline and creates a list of most common locations.

💡 Note: Nlphose entity extraction will automatically guess coordinates for well known locations using dictionary based approach

 def get_latlon2(data):
json_obj = json.loads(data)
if 'entities' in json_obj.keys():
for e in json_obj['entities']:
if e['label'] == 'GPE' and 'cords' in e.keys():
return json.dumps({'data':[e['entity'],e['cords']['lat'],e['cords']['lon']]})
return None

get_latlon_udf2 = udf(get_latlon2)
df_locations = output.withColumn("locations",get_latlon_udf2(output["json_obj"]))
top_locations = df_locations.filter("`locations` != 'null'").groupby("locations").count().sort(desc("count")).filter("`count` >= 1")
top_locations.cache()
top_locations.show()

Then we use ‘geopandas’ package to plot these locations on a world map. Before we do that we will have to transform our dataframe to the format understood by ‘geopandas’. This is done by applying the function ‘add_lat_long’

def add_lat_long(row):
obj = json.loads(row[0])["data"]
row["lat"] = obj[1]
row["lon"] = obj[2]
return row
import geopandas

df_locations = top_locations.toPandas()
df_locations = df_locations.apply(add_lat_long,axis=1)

gdf = geopandas.GeoDataFrame(df_locations, geometry=geopandas.points_from_xy(df_locations.lon, df_locations.lat))
world = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
ax = world.plot(color=(25/255,211/255,243/255) ,edgecolor=(25/255,211/255,243/255),
linewidth=0.4,edgecolors='none',figsize=(15, 15))
ax.axis('off')
gdf.plot(ax=ax,alpha=0.5,marker=".",markersize=df_locations['count']*100,color='seagreen')

If you are familiar with this book, you will realize we have almost plotted the actual route taken by Fogg in his famous journey.

For reference, shown below is the image of actual route taken by him from wikipedia

Around the World in Eighty Days map

Roke, CC BY-SA 3.0, via Wikimedia Commons

Creating a word cloud of items carried by Fogg’s on his Journey

The below code finds the most common items carried by the travellers using ‘extractive question answering’ and creates a word cloud.

from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

figure(figsize=(12, 6), dpi=120)
wordcloud = WordCloud(background_color='white',width=1024,height=500).generate(' '.join(output.filter("`answer` != 'null'").toPandas()['answer'].tolist()))
plt.imshow(wordcloud)
plt.axis("off")
plt.show()

Conclusion

One of the reasons why PySpark is a favorite tool of data scientists and ML practitioners is because working with dataframes is very convenient. This article shows how we can run Nlphose on a Spark cluster using PySpark. Using the approach described in this article we can embed Nlphose pipeline as part of our data processing pipelines very easily. Hope you liked this article. Feedback and comments are always welcomed, thank you !