Skip to content

Download and preprocess MEDLINE dataset

Titipat Achakulvisut edited this page Jun 12, 2019 · 6 revisions

Create a data folder and download MEDLINE dataset (Bulk download URLs can be found here)

wget ftp://ftp.ncbi.nlm.nih.gov/pubmed/baseline/*.gz

then, download the updates

wget ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/*.gz

This should contain around 29M articles from MEDLINE after parsing. Alternatively, you can sync ftp files using the following commands

curlftpfs ftp://ftp.ncbi.nlm.nih.gov/pubmed/baseline/* .gz ftp_mount/
rsync -r -t  -v --progress ftp_mount/* baseline/
fusermount -u ftp_mount

Assuming that pubmed_parser is in the path. If pubmed_parser is not in the path, we can produce egg file by running python setup.py bdist_egg in repository and add it manually by running the following:

spark.sparkContext.addPyFile('pubmed_parser/dist/pubmed_parser-0.1-py3.5.egg') # building with Python 3.5

Then, run the following to preprocessing all files:

import os
from glob import glob
import pubmed_parser as pp
from pyspark.sql import SparkSession
from pyspark.sql import Row

medline_files_rdd = spark.sparkContext.parallelize(glob('data/*.gz'), numSlices=1000)

parse_results_rdd = medline_files_rdd.\
    flatMap(lambda x: [Row(file_name=os.path.basename(x), **publication_dict) 
                       for publication_dict in pp.parse_medline_xml(x)])

medline_df = parse_results_rdd.toDF()
# save to parquet
medline_df.write.parquet('raw_medline.parquet', mode='overwrite')

Now, we will process the updates and deletes.

medline_df = spark.read.parquet('raw_medline.parquet')

from pyspark.sql import Window
from pyspark.sql.functions import rank, max, sum, desc

window = Window.partitionBy(['pmid']).orderBy(desc('file_name'))

windowed_df = medline_df.select(
    max('delete').over(window).alias('is_deleted'),
    rank().over(window).alias('pos'),
    '*')

windowed_df.where('is_deleted = False and pos = 1').\
    write.\
    parquet('medline_lastview.parquet', mode='overwrite')

To parse Grant ID table, do the following

grant_df = medline_files_rdd.\
    flatMap(lambda x: [Row(file_name=os.path.basename(x), **grant_dict) 
                       for grant_dict in pp.parse_medline_grant_id(x)]).\
    toDF()
grant_df.write.parquet('grants_df.parquet')

To compute tfidf using pyspark

from pyspark.sql import functions

final_results_df = spark.read.parquet('medline_lastview.parquet')
# we will create a column content that concatenas author, affiliation, title, and abstract
content_df = final_results_df.select(final_results_df.pmid, 
                        functions.trim(functions.concat_ws(' ', 
                            final_results_df.author, 
                            final_results_df.affiliation, 
                            final_results_df.title,
                            final_results_df.abstract)).alias('content'))

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, RegexTokenizer

tokenizer = Tokenizer(inputCol='content', outputCol='words')
stopwordsremover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='words_wo_stop')
hashingTF = HashingTF(inputCol=stopwordsremover.getOutputCol(), outputCol='features', numFeatures=2**18)
idf = IDF(minDocFreq=3, inputCol=hashingTF.getOutputCol(), outputCol='tfidf')

tfidf_pipeline = Pipeline(stages=[tokenizer, stopwordsremover, hashingTF, idf])
tfidf_model = tfidf_pipeline.fit(content_df)
tdidf_df = tfidf_model.transform(content_df)

tdidf_df = tfidf_model.transform(content_df).select('pmid', 'tfidf')

tfidf_model.save('tfidf.model')

tdidf_df.write.parquet('tfidf.parquet')