-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplayertweets.py
82 lines (63 loc) · 2.28 KB
/
playertweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import http.client
import tweepy
import json
import pandas as pd
import os
import sys
import pyspark
import findspark
findspark.init()
import re
import config
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import http.client
import mimetypes
import twarc
from random import sample
import pyarrow.parquet as pq
import pyarrow as pa
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DecimalType, DateType
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("PySpark App") \
.getOrCreate()
client = tweepy.Client(bearer_token=config.Bearer_Token)
df_players=spark.read.parquet("players_files")
players = df_players.select("name").rdd.flatMap(lambda x: x).collect()
player_id = df_players.select("playerId").rdd.flatMap(lambda x: x).collect()
player_dict = dict(zip(players,player_id))
def export_parquet_table(df,file_name):
df.write.mode('overwrite').parquet(str(file_name)+"_files") #Overwrite the new dataframe.
tweets_dict = {}
for player in players:
query = str(player) + " -is:retweet"
tweets = client.search_recent_tweets(query=query)
tweets_id = []
tweets_text = []
for tweet in tweets.data:
tweets_id.append(tweet.id)
tweets_text.append(tweet.text)
tweets_dict[player] = list(zip(tweets_id, tweets_text))
schema = StructType([
StructField('playerId', StringType(), True),
StructField('Tweet', StringType(), True),
StructField('Tweet_id', StringType(), True)
])
x = tweets_dict.items()
df_tweets = pd.DataFrame(x,columns=['Player','Tweet'])
df_tweets = df_tweets.explode('Tweet')
df_tweets[['Tweet_id','Tweet']] = pd.DataFrame(df_tweets["Tweet"].tolist(), index= df_tweets.index)
df_tweets['playerId'] = df_tweets['Player'].map(player_dict)
df_tweets.drop(columns=['Player'],inplace=True)
first_column = df_tweets.pop('playerId')
df_tweets.insert(0, 'playerId', first_column)
df_tweets = spark.createDataFrame(df_tweets,schema)
df_tweets.show()
export_parquet_table(df_tweets,'tweets')