-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathfacebook_messenger_conversation.py
233 lines (191 loc) · 7.79 KB
/
facebook_messenger_conversation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import sys
import numpy as np
import json
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
from matplotlib.backends.backend_pdf import PdfPages
from datetime import datetime, timedelta
import matplotlib.dates as mdates
import emoji
class FacebookMessengerConversation():
"""Module for getting stats of a Facebook Messenger conversation.
Attributes:
data (dict): The conversation of interest.
title (str) : Title of the conversation.
p (list): List of conversation participants.
"""
def __init__(self, conversation):
"""Prepares `conversation` and fetches its participants.
Args:
conversation (json): Conversation downloaded from
Facebook (see https://www.facebook.com/help/
212802592074644?helpref=uf_permalink)
"""
self.data = json.load(open(conversation))
self.title = self.data['title']
# Convert unicode characters
for p in self.data['participants']:
p['name'] = p['name'].encode('raw_unicode_escape').decode('utf-8')
for message in self.data['messages']:
message['sender_name'] = message['sender_name'].encode(
'raw_unicode_escape').decode('utf-8')
if 'content' in message:
message['content'] = message['content'].encode(
'raw_unicode_escape').decode('utf-8')
# Set names of conversation participants
nbr_participants = len(self.data['participants'])
self.p = nbr_participants * [None]
for i in range(nbr_participants):
self.p[i] = self.data['participants'][i]['name']
def get_participants(self):
"""Returns the names of the conversation participants.
Returns:
list: Contains the conversation participants
"""
return self.p
def get_time_interval(self, type):
"""Returns the start and end time of the conversation.
Args:
type (str): Decides what type should be returned. Either
'datetime' or 'str'.
Returns:
tuple: (start, end). Either as datetimes or strings.
Raises:
ValueError: If a not supported `type` was entered.
"""
start = datetime.fromtimestamp(
self.data['messages'][-1]['timestamp_ms']/1000)
end = datetime.fromtimestamp(
self.data['messages'][0]['timestamp_ms']/1000)
if type == 'datetime':
return start, end
elif type == 'str':
return start.strftime('%Y-%m-%d %H:%M:%S'), \
end.strftime('%Y-%m-%d %H:%M:%S')
else:
raise ValueError('Type not supported. Must be '\
'either datetime or str.')
def get_nbr_days(self):
"""Returns the number days between the first and last message.
Returns:
int: Days between start and end.
"""
start, end = self.get_time_interval('datetime')
return (end - start).days + 1
def get_nbr_msg(self):
"""Returns the total number of messages.
Returns:
int: Number of messages.
"""
return len(self.data['messages'])
def get_nbr_words(self):
"""Returns the total number of words.
Returns:
int: Number of words.
"""
nbr_words = 0
for message in self.data['messages']:
if 'content' in message:
nbr_words += len(message['content'].split())
return nbr_words
def get_avg_len_msg(self):
"""Returns the average length of a message.
Returns:
float: Average length of message.
"""
return round(self.get_nbr_words()/self.get_nbr_msg(), 1)
def get_avg_msg_day(self):
"""Returns the average number of messages sent each day.
Returns:
float: Average number of messages sent per day.
"""
return round(self.get_nbr_msg()/self.get_nbr_days(), 1)
def activity(self):
"""Activity by each conversation participant.
Returns:
dict: Contains a list (value) with the number of messages
sent and the percentage it corresponds to per
participant (key).
"""
nbr_msg = self.get_nbr_msg()
act = {p: 0 for p in self.p}
for message in self.data['messages']:
try:
act[message['sender_name']] += 1
except KeyError:
pass
for key in act:
nbr_msg_p = act[key]
act[key] = [nbr_msg_p, 100*round(nbr_msg_p/nbr_msg, 2)]
return act
def timeline(self):
"""Fetches data when messages are sent.
Returns:
tuple: Containing which days messages were sent and also
how many were sent per day, weekday and hour.
"""
nbr_days = self.get_nbr_days()
timeline = [None] * nbr_days
hour = list(range(24))
weekday_arr = [0, 1, 2, 3, 4, 5, 6]
nbr_times_hour = [0] * 24
nbr_times_weekday = [0] * 7
nbr_times_day = [0] * nbr_days
_, end = self.get_time_interval('datetime')
current_day = end.date()
index = len(timeline) - 1
timeline[index] = current_day
nbr_times_day[index] = 1
for message in self.data['messages']:
current = datetime.fromtimestamp(
message['timestamp_ms']/1000)
h = int(round(current.hour + current.minute/60. +\
current.second/3600))
if h == 24:
h = 0
nbr_times_hour[h] = nbr_times_hour[h] + 1
wd = current.weekday()
nbr_times_weekday[wd] = nbr_times_weekday[wd] + 1
current = current.date()
if current == current_day:
nbr_times_day[index] = nbr_times_day[index] + 1
elif current < current_day:
diff = (current_day - current).days
index = index - diff
current_day = current
timeline[index] = current_day
nbr_times_day[index] = 1
dates = [None] * len(timeline)
for i in range(0, len(timeline)):
if timeline[i] == None:
timeline[i] = timeline[i - 1] + timedelta(days=1)
dates[i] = timeline[i].strftime('%Y-%m-%d')
return timeline, nbr_times_day, nbr_times_weekday, nbr_times_hour
def top_emojis(self, nbr):
"""Returns the top `nbr` emojis used and who sent them.
Args:
nbr (int): The number of emojis to include in top list.
Returns:
tuple: List of top emojis and dict showing how many of
these were sent by each participant.
"""
emojis = {e: 0 for e in iter(emoji.UNICODE_EMOJI['en'].values())}
emojis_p = {p: 0 for p in self.p}
for p in emojis_p:
emojis_p[p] = {e: 0 for e in iter(emoji.UNICODE_EMOJI['en'].values())}
for message in self.data['messages']:
if 'content' in message:
msg = message['content']
sender = message['sender_name']
for c in msg:
emoji_str = emoji.demojize(c)
if emoji_str in emojis and sender in emojis_p:
emojis_p[sender][emoji_str] += 1
emojis[emoji_str] += 1
top_emojis = [emoji_key for emoji_key, count in sorted(emojis.items(),
key=lambda kv: (-kv[1], kv[0]))[:nbr]]
emojis_count_p = {p: {} for p in self.p}
for p in self.p:
emojis_count_p[p] = [emojis_p[p][e] for e in top_emojis]
top_emojis = [emoji.emojize(top_emoji) for top_emoji in top_emojis]
return top_emojis, emojis_count_p