-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSemanticScholar.py
118 lines (102 loc) · 3.99 KB
/
SemanticScholar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from configparser import ConfigParser
from pathlib import Path
import numpy as np
import pandas as pd
# import plotly.express as px
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Window
def gen_SS_metadata(spark):
# Define directories
#
cf = ConfigParser()
cf.read("config.cf")
dir_db = Path(cf.get("data", "semanticscholar"))
#################################################
#### Load info
#################################################
ss = spark.read.parquet(dir_db.joinpath("papers.parquet").as_posix())
ss_cit = spark.read.parquet(dir_db.joinpath("citations.parquet").as_posix())
#################################################
#### Citation information
#################################################
window = Window.partitionBy("source", "dest").orderBy("source", "dest")
ss_cit = ss_cit.withColumn("dup", F.row_number().over(window)).withColumn(
"autoCit", F.col("source") == F.col("dest")
)
ss_cit_unique = ss_cit.where(F.col("dup") == 1)
#################################################
#### Quartile information
#################################################
df = pd.read_csv("scimagojr 2021.csv", sep=";")
df = df.fillna(np.nan).replace([np.nan], [None])
scimago = spark.createDataFrame(df)
ss = ss.join(scimago, F.lower(ss.journalName) == F.lower(scimago.Title), "left")
ss_journ = ss.where(F.length("journalName") > 0)
ss_journ_val = ss_journ.where(F.col("Rank").isNotNull())
#################################################
#### Get counts
#################################################
tot_ss = ss.count()
tot_journ = ss_journ.count()
tot_journ_val = ss_journ_val.count()
ss_tot = ss_cit.count()
ss_tot_unique = ss_cit_unique.count()
ss_tot_unique_self = ss_cit_unique.where(F.col("autoCit")).count()
print(f"Total papers in SS: {tot_ss}")
print(f"Total papers in SS with journal: {tot_journ} ({tot_journ/tot_ss * 100:.3f}%)")
print(f"Total papers in SS with valid journal: {tot_journ_val} ({tot_journ_val/tot_ss * 100:.3f}%)")
print(f"Number citations: {ss_tot}")
print(f"Number unique citations: {ss_tot_unique} ({ss_tot_unique/ss_tot*100:.3f}%)")
print(f"Number self-citations: {ss_tot_unique_self} ({ss_tot_unique_self/ss_tot*100:.3f}%)")
df_year_SJR = (
ss_journ.select("year", "SJR Best Quartile")
.groupBy("year", "SJR Best Quartile")
.count()
.toPandas()
)
df = df_year_SJR.groupby(["year", "SJR Best Quartile"]).agg(lambda x: x)
df = df_year_SJR.rename(
columns={"year": "Year", "SJR Best Quartile": "Quartile", "count": "Count"}
)
# df = df[df["Year"] > 1950]
quartile_order = sorted(
df_year_SJR["SJR Best Quartile"].unique(),
key=lambda x: "zz" if x is None else x,
)
quarts = df[["Quartile", "Count"]].groupby(["Quartile"]).sum().to_dict()["Count"]
# fig = px.bar(
# df,
# x="Year",
# y="Count",
# color="Quartile",
# # barmode="group",
# # log_y=True,
# category_orders={"Quartile": quartile_order},
# title="",
# )
# fig.update_layout(
# {
# "xaxis.rangeslider.visible": True,
# "yaxis.fixedrange": True,
# }
# )
# fig.show()
#################################################
#### Output data
#################################################
columns = ["Num_cit", "Num_cit_wo_self", "quartiles"]
row = [ss_tot, ss_tot - ss_tot_unique_self, quarts]
data = [row]
df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.show(truncate=False)
df.write.parquet(
dir_db.joinpath("metadata.parquet").as_posix(),
mode="overwrite",
)
if __name__ == "__main__":
# Create session
spark = SparkSession.builder.appName("WP3pipeline").getOrCreate()
sc = spark.sparkContext
print(sc.version)
gen_SS_metadata(spark)