-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathkeyword_extraction.py
156 lines (126 loc) · 5.19 KB
/
keyword_extraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# -*- coding: utf-8 -*-
"""Keyword_Extraction_TextRank.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1KgpSSqSlbslomyrLpzdAhUONk9eQyRfX
* [Blog: Explanation of Keyword Extraction using Spacy and TextRank](https://towardsdatascience.com/textrank-for-keyword-extraction-by-python-c0bae21bcec0)
* [Github](https://gist.github.com/BrambleXu/3d47bbdbd1ee4e6fc695b0ddb88cbf99)
"""
from collections import OrderedDict
import numpy as np
import spacy
from spacy.lang.en.stop_words import STOP_WORDS
import nltk
# nltk.download('punkt')
nlp = spacy.load('en_core_web_sm')
class TextRank():
"""Extract keywords from text"""
def __init__(self):
self.d = 0.85 # damping coefficient, usually is .85
self.min_diff = 1e-5 # convergence threshold
self.steps = 10 # iteration steps
self.node_weight = None # save keywords and its weight
def set_stopwords(self, stopwords):
"""Set stop words"""
for word in STOP_WORDS.union(set(stopwords)):
lexeme = nlp.vocab[word]
lexeme.is_stop = True
def sentence_segment(self, doc, candidate_pos, lower):
"""Store those words only in cadidate_pos"""
sentences = []
for sent in doc.sents:
selected_words = []
for token in sent:
# Store words only with cadidate POS tag
if token.pos_ in candidate_pos and token.is_stop is False:
if lower is True:
selected_words.append(token.text.lower())
else:
selected_words.append(token.text)
sentences.append(selected_words)
return sentences
def get_vocab(self, sentences):
"""Get all tokens"""
vocab = OrderedDict()
i = 0
for sentence in sentences:
for word in sentence:
if word not in vocab:
vocab[word] = i
i += 1
return vocab
def get_token_pairs(self, window_size, sentences):
"""Build token_pairs from windows in sentences"""
token_pairs = list()
for sentence in sentences:
for i, word in enumerate(sentence):
for j in range(i+1, i+window_size):
if j >= len(sentence):
break
pair = (word, sentence[j])
if pair not in token_pairs:
token_pairs.append(pair)
return token_pairs
def symmetrize(self, a):
return a + a.T - np.diag(a.diagonal())
def get_matrix(self, vocab, token_pairs):
"""Get normalized matrix"""
# Build matrix
vocab_size = len(vocab)
g = np.zeros((vocab_size, vocab_size), dtype='float')
for word1, word2 in token_pairs:
i, j = vocab[word1], vocab[word2]
g[i][j] = 1
# Get Symmeric matrix
g = self.symmetrize(g)
# Normalize matrix by column
norm = np.sum(g, axis=0)
g_norm = np.divide(g, norm, where=norm!=0) # this is ignore the 0 element in norm
return g_norm
def get_keywords(self, number=10):
"""Print top number keywords"""
kw = {}
node_weight = OrderedDict(sorted(self.node_weight.items(), key=lambda t: t[1], reverse=True))
for i, (key, value) in enumerate(node_weight.items()):
kw[key] = value
if i > number:
break
return kw
def analyze(self, text,
candidate_pos=['NOUN', 'PROPN'],
window_size=4, lower=False, stopwords=list()):
"""Main function to analyze text"""
# Set stop words
self.set_stopwords(stopwords)
# Pare text by spaCy
doc = nlp(text)
# Filter sentences
sentences = self.sentence_segment(doc, candidate_pos, lower) # list of list of words
# Build vocabulary
vocab = self.get_vocab(sentences)
# Get token_pairs from windows
token_pairs = self.get_token_pairs(window_size, sentences)
# Get normalized matrix
g = self.get_matrix(vocab, token_pairs)
# Initionlization for weight(pagerank value)
pr = np.array([1] * len(vocab))
# Iteration
previous_pr = 0
for epoch in range(self.steps):
pr = (1-self.d) + self.d * np.dot(g, pr)
if abs(previous_pr - sum(pr)) < self.min_diff:
break
else:
previous_pr = sum(pr)
# Get weight for each node
node_weight = dict()
for word, index in vocab.items():
node_weight[word] = pr[index]
self.node_weight = node_weight
def keywordExtraction(sentences, candidate_pos, window_size, lower = True):
result = []
for sentence in sentences:
tr = TextRank()
tr.analyze(sentence, candidate_pos = candidate_pos, window_size = window_size , lower = lower)
result.append(tr.get_keywords(len(nltk.tokenize.word_tokenize(sentence))))
return result