emacs-subreddit/src/subreddit/authenticated_reddit.py

319 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# pylint: disable=wrong-import-position
from __future__ import unicode_literals
from __future__ import print_function
import os
import re
import random
import webbrowser
import json
import functools
import sys
import logging
from time import time
import datetime
from multiprocessing import Process
from .rtv import config
config.TEMPLATES = os.path.join(os.path.dirname(__file__), 'templates')
config.DEFAULT_CONFIG = os.path.join(config.TEMPLATES, 'rtv.cfg')
config.DEFAULT_MAILCAP = os.path.join(config.TEMPLATES, 'mailcap')
config.HISTORY = os.path.join(config.XDG_DATA_HOME, 'nnreddit', 'history.log')
config.TOKEN = os.path.join(config.XDG_DATA_HOME, 'nnreddit', 'refresh-token')
import praw
from praw import Reddit
from prawcore.sessions import session
from prawcore.exceptions import OAuthException
from prawcore import Authorizer
from .rtv.oauth import OAuthHTTPServer, OAuthHandler
from .rtv.exceptions import BrowserError
from .rtv import docs
with open(os.path.join(os.path.dirname(__file__), 'VERSION'), encoding="utf-8") as version_file:
version = version_file.read().strip()
class AuthenticatedReddit(Reddit):
@staticmethod
def open_url_silent(url):
stdout, stderr = os.dup(1), os.dup(2)
null = os.open(os.devnull, os.O_RDWR)
try:
os.dup2(null, 1)
os.dup2(null, 2)
webbrowser.open_new_tab(url)
finally:
try:
os.close(null)
except OSError:
pass
os.dup2(stdout, 1)
os.dup2(stderr, 2)
def __init__(self, **kwargs):
log_level = kwargs.pop('log_level', logging.NOTSET)
logging.basicConfig(level=log_level,
filename='/dev/null',
format='%(asctime)s %(levelname)s %(message)s',
datefmt="%Y-%m-%d %H:%M:%S")
log_prefix = kwargs.pop('log_prefix', None)
if log_prefix:
stamp = datetime.datetime.fromtimestamp(time()).strftime('%Y%m%d.%H%M%S')
logging.getLogger().addHandler(logging.FileHandler(log_prefix + stamp))
localhost = kwargs.pop('localhost', '127.0.0.1')
default_kwargs = {
'history_file': config.HISTORY,
'token_file': config.TOKEN,
'redirect_uri': 'http://' + localhost + ':17973',
'client_id': '5oagOpX2_NKVDej_iuZjFA',
'client_secret': '',
'redirect_port': 17973,
'user_agent': praw.const.USER_AGENT_FORMAT.\
format(':'.join([os.uname()[0], 'nnreddit', version])),
}
default_kwargs = { k: v for k,v in default_kwargs.items() if k not in kwargs }
kwargs.update(default_kwargs)
cfg = config.Config(**kwargs)
cfg.load_history()
cfg.load_refresh_token()
cfg.config['refresh_token'] = cfg.refresh_token
logging.getLogger().debug("Refresh token: %s", cfg.token_file)
super().__init__(**cfg.config)
if not cfg.refresh_token:
self._core \
= self._authorized_core \
= session(Authorizer(self._core._authorizer._authenticator))
state = str(random.randint(0, 65000))
url = self._authorized_core._authorizer._authenticator.\
authorize_url('permanent', ['edit',
'history',
'identity',
'mysubreddits',
'privatemessages',
'read',
'report',
'save',
'submit',
'subscribe',
'vote'], state)
docs_sub = re.compile(r'reddit terminal viewer', re.IGNORECASE)
docs.OAUTH_SUCCESS = docs_sub.sub('nnreddit', docs.OAUTH_SUCCESS)
docs.OAUTH_ACCESS_DENIED = docs_sub.sub('nnreddit', docs.OAUTH_ACCESS_DENIED)
print("::user::Please check your browser.", file=sys.stderr)
if cfg.token_file == "/dev/null":
cfg.refresh_token = None
else:
proc = Process(target=self.open_url_silent, args=(url,))
proc.start()
try:
proc.join(7)
if proc.is_alive():
raise BrowserError(
'Timeout waiting for browser to open')
finally:
try:
proc.terminate()
except OSError:
pass
server = OAuthHTTPServer(('', cfg.config['redirect_port']), OAuthHandler)
server.serve_forever()
try:
self._authorized_core._authorizer.authorize(OAuthHandler.params['code'])
except OAuthException:
print("::user::Ahh, that's too bad", file=sys.stderr)
except Exception as e:
print(f"::user::An unexpected error occurred: {e}", file=sys.stderr)
else:
cfg.refresh_token = self._authorized_core._authorizer.refresh_token
cfg.save_refresh_token()
if 'history_size' in cfg.config:
cfg.save_history()
self._bodies = {}
self._stream_comm = {}
self._stream_subm = {}
self._stream_inbox = None
@staticmethod
def make_dict(reddit_base):
with_nulls = json.dumps(vars(reddit_base), skipkeys=True, default=lambda o:
str(o) if hasattr(o, 'STR_FIELD') else None)
without_nulls = {k:v for k,v in json.loads(with_nulls).items()
if v is not None}
return without_nulls
@staticmethod
def collect_dicts(stream):
result = []
for i in stream:
if i is None:
break
result.append(AuthenticatedReddit.make_dict(i))
return result
def random_subreddit(self, *, nsfw: bool=False):
above = super().random_subreddit(nsfw=nsfw)
return above.display_name
def search(self, query, **generator_kwargs):
return [ x.display_name for x in self.subreddits.search(query, **generator_kwargs) ]
def popular(self, **generator_kwargs):
return [ x.display_name for x in self.subreddits.popular(**generator_kwargs) ]
def subscribe(self, display_name):
self.subreddit(display_name).subscribe()
def unsubscribe(self, display_name):
self.subreddit(display_name).unsubscribe()
def submit(self, display_name, title, **kwargs):
self.subreddit(display_name).submit(title, **kwargs)
def reply(self, name, body, q_reply_root):
(mytype, myid) = name.split("_", 1)
parent = None
if mytype == self.config.kinds['submission']:
parent = self.submission(myid)
elif mytype == self.config.kinds['comment']:
parent = self.comment(myid)
if q_reply_root and parent.link_id:
(_, root_id) = parent.link_id.split("_", 1)
parent = self.submission(root_id)
else:
raise ValueError('Unexpected name {} with type {}'.format(name, mytype))
parent.reply(body)
def edit(self, name, body):
(mytype, myid) = name.split("_", 1)
editable = None
if mytype == self.config.kinds['submission']:
editable = self.submission(myid)
elif mytype == self.config.kinds['comment']:
editable = self.comment(myid)
else:
raise ValueError('Unexpected name {} with type {}'.format(name, mytype))
editable.edit(body)
def remove(self, name):
(mytype, myid) = name.split("_", 1)
editable = None
if mytype == self.config.kinds['submission']:
editable = self.submission(myid)
elif mytype == self.config.kinds['comment']:
editable = self.comment(myid)
else:
raise ValueError('Unexpected name {} with type {}'.format(name, mytype))
editable.delete()
def comments(self, display_name):
if display_name not in self._stream_comm:
self._stream_comm[display_name] = self.subreddit(display_name).\
stream.comments(pause_after=0)
if display_name not in self._bodies:
self._bodies[display_name] = {}
dicts = self.collect_dicts(self._stream_comm.get(display_name))
for dic in dicts:
if 'body_html' in dic:
self._bodies[display_name][dic['id']] = dic['body_html']
else:
self._bodies[display_name][dic['id']] = 'Wow, such empty'
for k in list(dic):
if k.startswith('body'):
del dic[k]
return dicts
def vote(self, name, vote):
(mytype, myid) = name.split("_", 1)
votable = None
if mytype == self.config.kinds['submission']:
votable = self.submission(myid)
elif mytype == self.config.kinds['comment']:
votable = self.comment(myid)
else:
raise ValueError('Unexpected name {} with type {}'.format(name, mytype))
try:
if vote == 0:
votable.clear_vote()
elif vote < 0:
votable.downvote()
else:
votable.upvote()
except AttributeError as exc:
raise AttributeError('{} un-votable: {}'.format(name, str(exc))) from exc
def body(self, display_name, name):
(mytype, myid) = name.split("_", 1)
result = None
cached = self._bodies.get(display_name)
if cached:
result = cached.get(myid)
if not result:
if mytype == self.config.kinds['submission']:
result = self.submission(myid).selftext_html
else:
result = self.comment(myid).body_html
return result
def canonical_spelling(self, display_name):
lazy = self.subreddit(display_name)
# pragma pylint: disable=protected-access
lazy._fetch()
# pragma pylint: enable=protected-access
return lazy.display_name
def submissions(self, display_name):
if display_name not in self._stream_subm:
self._stream_subm[display_name] = self.subreddit(display_name).\
stream.submissions(pause_after=0)
if display_name not in self._bodies:
self._bodies[display_name] = {}
dicts = self.collect_dicts(self._stream_subm.get(display_name))
for dic in dicts:
if 'selftext_html' in dic:
self._bodies[display_name][dic['id']] = dic['selftext_html']
elif 'url' in dic:
self._bodies[display_name][dic['id']] \
= ''.join(['<div>', '<p>',
'<a href="{0}">{0}</a>'.format(dic.get('url')),
'</div>'])
else:
self._bodies[display_name][dic['id']] = 'Wow, such empty'
for k in list(dic):
if k.startswith('selftext'):
del dic[k]
return dicts
def user_subreddits(self):
return [sr.display_name for sr in self.user.subreddits()]
def user_attr(self, attr):
return getattr(self.user.me(), attr)
def inboxes(self, inbox_name):
if not self._stream_inbox:
self._stream_inbox = self.inbox.stream(pause_after=0)
if inbox_name not in self._bodies:
self._bodies[inbox_name] = {}
dicts = self.collect_dicts(self._stream_inbox)
dicts = [d for d in dicts if d.get('type') == 'comment_reply' or
d.get('type') == 'post_reply']
for dic in dicts:
if 'body_html' in dic:
self._bodies[inbox_name][dic['id']] = dic['body_html']
else:
self._bodies[inbox_name][dic['id']] = 'Wow, such empty'
for k in list(dic):
if k.startswith('body'):
del dic[k]
return dicts