diff --git a/README.md b/README.md index 5664a5f..0e5cf78 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ This project should work in the latest releases of Python 2.7 and Python 3. By d ## Configuring -There are several parameters that control the behavior of the bot. You can adjust them by setting them in your `local_settings.py` file. +There are several parameters that control the behavior of the bot. You can adjust them by setting them in your `local_settings.py` file. ``` ODDS = 8 @@ -42,6 +42,22 @@ ORDER = 2 The ORDER variable represents the Markov index, which is a measure of associativity in the generated Markov chains. 2 is generally more incoherent and 3 or 4 is more lucid. I tend to stick with 2. +### Additional sources + +This bot was originally designed to pull tweets from a Twitter account, however, it can also process comma-separated text in a text file, or scrape content from the web. + +#### Static Text +To use a local text file, set `STATIC_TEST = True` and specify the name of a text file containing comma-separated "tweets" as `TEST_SOURCE`. + +#### Web Content +To scrape content from the web, set `SCRAPE_URL` to `True`. This bot makes use of the [`find_all()` method](https://www.crummy.com/software/BeautifulSoup/bs4/doc/#find-all) of Python's BeautfulSoup library. The implementation of this method requires the definition of three inputs in `local_settings.py`. + +1. A list of URLs to scrape as `SRC_URL`. +2. A list, `WEB_CONTEXT`, of the [names](https://www.crummy.com/software/BeautifulSoup/bs4/doc/#id11) of the elements to extract from the corresponding URL. This can be "div", "h1" for level-one headings, "a" for links, etc. If you wish to search for more than one name for a single page, repeat the URL in the `SRC_URL` list for as many names as you wish to extract. +3. A list, `WEB_ATTRIBUTES` of dictionaries containing [attributes](https://www.crummy.com/software/BeautifulSoup/bs4/doc/#attrs) to filter by. For instance, to limit the search to divs of class "title", one would pass the directory: `{"class": "title"}`. Use an empty dictionary, `{}`, for any page and name for which you don't wish to specify attributes. + +__Note:__ Web scraping is experimental and may give you unexpected results. Make sure to test the bot in debugging mode before publishing. + ## Debugging If you want to test the script or to debug the tweet generation, you can skip the random number generation and not publish the resulting tweets to Twitter. @@ -49,7 +65,7 @@ If you want to test the script or to debug the tweet generation, you can skip th First, adjust the `DEBUG` variable in `local_settings.py`. ``` -DEBUG = True +DEBUG = True ``` After that, commit the change and `git push heroku master`. Then run the command `heroku run worker` on the command line and watch what happens. diff --git a/ebooks.py b/ebooks.py index 6e99c4c..41b5255 100644 --- a/ebooks.py +++ b/ebooks.py @@ -3,21 +3,25 @@ import sys import twitter import markov +from bs4 import BeautifulSoup try: # Python 3 from html.entities import name2codepoint as n2c + from urllib.request import urlopen except ImportError: # Python 2 from htmlentitydefs import name2codepoint as n2c + from urllib2 import urlopen chr = unichr from local_settings import * + def connect(): - api = twitter.Api(consumer_key=MY_CONSUMER_KEY, - consumer_secret=MY_CONSUMER_SECRET, - access_token_key=MY_ACCESS_TOKEN_KEY, - access_token_secret=MY_ACCESS_TOKEN_SECRET) - return api + return twitter.Api(consumer_key=MY_CONSUMER_KEY, + consumer_secret=MY_CONSUMER_SECRET, + access_token_key=MY_ACCESS_TOKEN_KEY, + access_token_secret=MY_ACCESS_TOKEN_SECRET) + def entity(text): if text[:2] == "&#": @@ -34,119 +38,154 @@ def entity(text): try: text = chr(numero) except KeyError: - pass + pass return text + def filter_tweet(tweet): - tweet.text = re.sub(r'\b(RT|MT) .+','',tweet.text) #take out anything after RT or MT - tweet.text = re.sub(r'(\#|@|(h\/t)|(http))\S+','',tweet.text) #Take out URLs, hashtags, hts, etc. - tweet.text = re.sub(r'\n','', tweet.text) #take out new lines. - tweet.text = re.sub(r'\"|\(|\)', '', tweet.text) #take out quotes. - tweet.text = re.sub(r'\s+\(?(via|says)\s@\w+\)?', '', tweet.text) # remove attribution + tweet.text = re.sub(r'\b(RT|MT) .+', '', tweet.text) # take out anything after RT or MT + tweet.text = re.sub(r'(\#|@|(h\/t)|(http))\S+', '', tweet.text) # Take out URLs, hashtags, hts, etc. + tweet.text = re.sub('\s+', ' ', tweet.text) # collaspse consecutive whitespace to single spaces. + tweet.text = re.sub(r'\"|\(|\)', '', tweet.text) # take out quotes. + tweet.text = re.sub(r'\s+\(?(via|says)\s@\w+\)?', '', tweet.text) # remove attribution htmlsents = re.findall(r'&\w+;', tweet.text) - if len(htmlsents) > 0 : - for item in htmlsents: - tweet.text = re.sub(item, entity(item), tweet.text) - tweet.text = re.sub(r'\xe9', 'e', tweet.text) #take out accented e + for item in htmlsents: + tweet.text = tweet.text.replace(item, entity(item)) + tweet.text = re.sub(r'\xe9', 'e', tweet.text) # take out accented e return tweet.text - - - + + +def scrape_page(src_url, web_context, web_attributes): + tweets = [] + last_url = "" + for i in range(len(src_url)): + if src_url[i] != last_url: + last_url = src_url[i] + print(">>> Scraping {0}".format(src_url[i])) + try: + page = urlopen(src_url[i]) + except Exception: + last_url = "ERROR" + import traceback + print(">>> Error scraping {0}:".format(src_url[i])) + print(traceback.format_exc()) + continue + soup = BeautifulSoup(page, 'html.parser') + hits = soup.find_all(web_context[i], attrs=web_attributes[i]) + if not hits: + print(">>> No results found!") + continue + else: + errors = 0 + for hit in hits: + try: + tweet = str(hit.text).strip() + except (UnicodeEncodeError, UnicodeDecodeError): + errors += 1 + continue + if tweet: + tweets.append(tweet) + if errors > 0: + print(">>> We had trouble reading {} result{}.".format(errors, "s" if errors > 1 else "")) + return(tweets) + + def grab_tweets(api, max_id=None): - source_tweets=[] + source_tweets = [] user_tweets = api.GetUserTimeline(screen_name=user, count=200, max_id=max_id, include_rts=True, trim_user=True, exclude_replies=True) - max_id = user_tweets[len(user_tweets)-1].id-1 + max_id = user_tweets[-1].id - 1 for tweet in user_tweets: tweet.text = filter_tweet(tweet) if re.search(SOURCE_EXCLUDE, tweet.text): continue - if len(tweet.text) != 0: + if tweet.text: source_tweets.append(tweet.text) return source_tweets, max_id -if __name__=="__main__": + +if __name__ == "__main__": order = ORDER - if DEBUG==False: - guess = random.choice(range(ODDS)) - else: - guess = 0 + guess = 0 + if ODDS and not DEBUG: + guess = random.randint(0, ODDS - 1) - if guess == 0: - if STATIC_TEST==True: + if guess: + print(str(guess) + " No, sorry, not this time.") # message if the random number fails. + sys.exit() + else: + api = connect() + source_tweets = [] + if STATIC_TEST: file = TEST_SOURCE print(">>> Generating from {0}".format(file)) string_list = open(file).readlines() for item in string_list: - source_tweets = item.split(",") - else: - source_tweets = [] + source_tweets += item.split(",") + if SCRAPE_URL: + source_tweets += scrape_page(SRC_URL, WEB_CONTEXT, WEB_ATTRIBUTES) + if SOURCE_ACCOUNTS and len(SOURCE_ACCOUNTS[0]) > 0: + twitter_tweets = [] for handle in SOURCE_ACCOUNTS: - user=handle - api=connect() + user = handle handle_stats = api.GetUser(screen_name=user) status_count = handle_stats.statuses_count - max_id=None - if status_count<3200: - my_range = (status_count/200) + 1 - else: - my_range = 17 - for x in range(my_range)[1:]: - source_tweets_iter, max_id = grab_tweets(api,max_id) - source_tweets += source_tweets_iter - print("{0} tweets found in {1}".format(len(source_tweets), handle)) - if len(source_tweets) == 0: + max_id = None + my_range = min(17, int((status_count/200) + 1)) + for x in range(1, my_range): + twitter_tweets_iter, max_id = grab_tweets(api, max_id) + twitter_tweets += twitter_tweets_iter + print("{0} tweets found in {1}".format(len(twitter_tweets), handle)) + if not twitter_tweets: print("Error fetching tweets from Twitter. Aborting.") sys.exit() + else: + source_tweets += twitter_tweets mine = markov.MarkovChainer(order) for tweet in source_tweets: - if re.search('([\.\!\?\"\']$)', tweet): - pass - else: - tweet+="." + if not re.search('([\.\!\?\"\']$)', tweet): + tweet += "." mine.add_text(tweet) - - for x in range(0,10): + + for x in range(0, 10): ebook_tweet = mine.generate_sentence() - #randomly drop the last word, as Horse_ebooks appears to do. - if random.randint(0,4) == 0 and re.search(r'(in|to|from|for|with|by|our|of|your|around|under|beyond)\s\w+$', ebook_tweet) != None: - print("Losing last word randomly") - ebook_tweet = re.sub(r'\s\w+.$','',ebook_tweet) - print(ebook_tweet) - - #if a tweet is very short, this will randomly add a second sentence to it. - if ebook_tweet != None and len(ebook_tweet) < 40: - rando = random.randint(0,10) - if rando == 0 or rando == 7: + # randomly drop the last word, as Horse_ebooks appears to do. + if random.randint(0, 4) == 0 and re.search(r'(in|to|from|for|with|by|our|of|your|around|under|beyond)\s\w+$', ebook_tweet) is not None: + print("Losing last word randomly") + ebook_tweet = re.sub(r'\s\w+.$', '', ebook_tweet) + print(ebook_tweet) + + # if a tweet is very short, this will randomly add a second sentence to it. + if ebook_tweet is not None and len(ebook_tweet) < 40: + rando = random.randint(0, 10) + if rando == 0 or rando == 7: print("Short tweet. Adding another sentence randomly") newer_tweet = mine.generate_sentence() - if newer_tweet != None: + if newer_tweet is not None: ebook_tweet += " " + mine.generate_sentence() else: ebook_tweet = ebook_tweet elif rando == 1: - #say something crazy/prophetic in all caps + # say something crazy/prophetic in all caps print("ALL THE THINGS") ebook_tweet = ebook_tweet.upper() - #throw out tweets that match anything from the source account. - if ebook_tweet != None and len(ebook_tweet) < 110: + # throw out tweets that match anything from the source account. + if ebook_tweet is not None and len(ebook_tweet) < 110: for tweet in source_tweets: if ebook_tweet[:-1] not in tweet: continue - else: + else: print("TOO SIMILAR: " + ebook_tweet) sys.exit() - - if DEBUG == False: + + if not DEBUG: status = api.PostUpdate(ebook_tweet) print(status.text.encode('utf-8')) else: print(ebook_tweet) - elif ebook_tweet == None: + elif not ebook_tweet: print("Tweet is empty, sorry.") else: print("TOO LONG: " + ebook_tweet) - else: - print(str(guess) + " No, sorry, not this time.") #message if the random number fails. diff --git a/local_settings_example.py b/local_settings_example.py index 81e5945..2b39cce 100644 --- a/local_settings_example.py +++ b/local_settings_example.py @@ -2,17 +2,21 @@ Local Settings for a heroku_ebooks account. #fill in the name of the account you're tweeting from here. ''' -#configuration +# Configuration MY_CONSUMER_KEY = 'Your Twitter API Consumer Key' MY_CONSUMER_SECRET = 'Your Consumer Secret Key' MY_ACCESS_TOKEN_KEY = 'Your Twitter API Access Token Key' MY_ACCESS_TOKEN_SECRET = 'Your Access Token Secret' -SOURCE_ACCOUNTS = [""] #A list of comma-separated, quote-enclosed Twitter handles of account that you'll generate tweets based on. It should look like ["account1", "account2"]. If you want just one account, no comma needed. -ODDS = 8 #How often do you want this to run? 1/8 times? -ORDER = 2 #how closely do you want this to hew to sensical? 2 is low and 4 is high. -SOURCE_EXCLUDE = r'^$' #Source tweets that match this regexp will not be added to the Markov chain. You might want to filter out inappropriate words for example. -DEBUG = True #Set this to False to start Tweeting live -STATIC_TEST = False #Set this to True if you want to test Markov generation from a static file instead of the API. -TEST_SOURCE = ".txt" #The name of a text file of a string-ified list for testing. To avoid unnecessarily hitting Twitter API. You can use the included testcorpus.txt, if needed. -TWEET_ACCOUNT = "" #The name of the account you're tweeting to. +SOURCE_ACCOUNTS = [""] # A list of comma-separated, quote-enclosed Twitter handles of account that you'll generate tweets based on. It should look like ["account1", "account2"]. If you want just one account, no comma needed. +ODDS = 8 # How often do you want this to run? 1/8 times? +ORDER = 2 # How closely do you want this to hew to sensical? 2 is low and 4 is high. +SOURCE_EXCLUDE = r'^$' # Source tweets that match this regexp will not be added to the Markov chain. You might want to filter out inappropriate words for example. +DEBUG = True # Set this to False to start Tweeting live +STATIC_TEST = False # Set this to True if you want to test Markov generation from a static file instead of the API. +TEST_SOURCE = ".txt" # The name of a text file of a string-ified list for testing. To avoid unnecessarily hitting Twitter API. You can use the included testcorpus.txt, if needed. +SCRAPE_URL = False # Set this to true to scrape a webpage. +SRC_URL = ['http://www.example.com/one', 'https://www.example.com/two'] # A comma-separated list of URLs to scrape +WEB_CONTEXT = ['span', 'h2'] # A comma-separated list of the tag or object to search for in each page above. +WEB_ATTRIBUTES = [{'class': 'example-text'}, {}] # A list of dictionaries containing the attributes for each page. +TWEET_ACCOUNT = "" # The name of the account you're tweeting to. diff --git a/markov.py b/markov.py index b9f78a0..6ff8530 100644 --- a/markov.py +++ b/markov.py @@ -1,23 +1,24 @@ import random import re + class MarkovChainer(object): def __init__(self, order): - self.order=order + self.order = order self.beginnings = [] self.freq = {} - #pass a string with a terminator to the function to add it to the markov lists. + # pass a string with a terminator to the function to add it to the markov lists. def add_sentence(self, string, terminator): data = "".join(string) words = data.split() buf = [] if len(words) > self.order: words.append(terminator) - self.beginnings.append(words[0:self.order]) + self.beginnings.append(words[0:self.order]) else: pass - + for word in words: buf.append(word) if len(buf) == self.order + 1: @@ -44,21 +45,21 @@ def add_text(self, text): else: sentence = piece - #Generate the goofy sentences that become your tweet. + # Generate the goofy sentences that become your tweet. def generate_sentence(self): res = random.choice(self.beginnings) res = res[:] - if len(res)==self.order: + if len(res) == self.order: nw = True - while nw != None: + while nw is not None: restup = (res[-2], res[-1]) try: nw = self.next_word_for(restup) - if nw != None: + if nw is not None: res.append(nw) else: continue - except: + except Exception: nw = False new_res = res[0:-2] if new_res[0].istitle() or new_res[0].isupper(): @@ -68,7 +69,7 @@ def generate_sentence(self): sentence = "" for word in new_res: sentence += word + " " - sentence += res[-2] + res[-1] + sentence += res[-2] + ("" if res[-1] in ".!?;:" else " ") + res[-1] else: sentence = None @@ -79,8 +80,9 @@ def next_word_for(self, words): arr = self.freq[words] next_words = random.choice(arr) return next_words - except: - return None + except Exception: + return None + if __name__ == "__main__": print("Try running ebooks.py first") diff --git a/requirements.txt b/requirements.txt index 4658fe9..be748d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -python-twitter \ No newline at end of file +python-twitter +beautifulsoup4