[Python] twitter をターミナル上で楽しむ

January 28, 2013

夜フクロウもいいんですが、ターミナルで表示できてもいいかなと思ったので作りました。 ライブラリはtweepyを使います。StreamingをつかってリアルタイムでTL取得を目指します。 TL取得のポーリング間隔に嫌気がさしてる方にはおすすめ(?)

python-twitterを利用してツイートを投稿するスクリプトもあったのですが、Apiの仕様変更かなんかでうまく動かなかったので、tweepyを使うバージョンに書き直したい。 書いたらブログに投稿したいな。

tweepyをinstall

easy_install tweepy

※セットアップはそれぞれの環境に合わせてくださいね

書く

シークレットキーとかは事前に取得すること。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import tweepy
from tweepy import Stream, TweepError
import logging
import urllib
CONSUMER_KEY=''
CONSUMER_SECRET=''
ACCESS_TOKEN=''
ACCESS_TOKEN_SECRET=''
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
class CustomStreamListener(tweepy.StreamListener):
def on_status(self, status):
try:
print "---%s---\n %s\n" % (
status.author.screen_name,
status.text)
except Exception, e:
print >> sys.stderr, 'Encountered Exception:', e
pass
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error with status code:', status_code
return True # Don't kill the stream
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream
class UserStream(Stream):
def user_stream(self, follow=None, track=None, async=False, locations=None):
self.parameters = {"delimited": "length", }
self.headers['Content-type'] = "application/x-www-form-urlencoded"
if self.running:
raise TweepError('Stream object already connected!')
self.scheme = "https"
self.host = 'userstream.twitter.com'
self.url = '/2/user.json'
if follow:
self.parameters['follow'] = ','.join(map(str, follow))
if track:
self.parameters['track'] = ','.join(map(str, track))
if locations and len(locations) > 0:
assert len(locations) % 4 == 0
self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
self.body = urllib.urlencode(self.parameters)
logging.debug("[ User Stream URL ]: %s://%s%s" % (self.scheme, self.host, self.url))
logging.debug("[ Request Body ] :" + self.body)
self._start(async)
def main():
stream = UserStream(auth, CustomStreamListener())
stream.timeout = None
stream.user_stream()
if __name__ == "__main__":
main()
view raw twitter_streaming.py hosted with ❤ by GitHub

動作確認

068c622ac141749f29706e8c70ff1ed6 300x46

まとめ

夜フクロウはリアルタイム取得だし機能もいっぱいで便利。 このプログラムはほとんど使わないと思う。


Profile picture

Written by Narimichi Takamura (@nari_ex) who works at Topotal as CEO. He love engineering and fighting game.