Revision: 8004
Author: xqt
Date: 2010-03-13 16:37:27 +0000 (Sat, 13 Mar 2010)
Log Message:
-----------
use site.has_api()
Modified Paths:
--------------
trunk/pywikipedia/userlib.py
Modified: trunk/pywikipedia/userlib.py
===================================================================
--- trunk/pywikipedia/userlib.py 2010-03-13 16:14:53 UTC (rev 8003)
+++ trunk/pywikipedia/userlib.py 2010-03-13 16:37:27 UTC (rev 8004)
@@ -12,8 +12,6 @@
import re
import wikipedia, query
-
-
class AutoblockUser(wikipedia.Error):
"""
The class AutoblockUserError is an exception that is raised whenever
@@ -63,48 +61,47 @@
#if self.site().versionnumber() >= 16:
# self._urToken = None
if name[0] == '#':
- #This user is probably being queried for purpose of lifting an autoblock.
- wikipedia.output("This is an autoblock ID, you can only use to unblock
it.")
-
-
+ # This user is probably being queried for purpose of lifting an
+ # autoblock.
+ wikipedia.output(
+ "This is an autoblock ID, you can only use to unblock it.")
-
def site(self):
return self._site
-
+
def name(self):
return self._name
-
+
def __str__(self):
return u'%s:%s' % (self.site() , self.name() )
-
+
def __repr__(self):
return self.__str__()
def _load(self):
getall(self.site(), [self], force=True)
return
-
+
def registrationTime(self, force = False):
if not hasattr(self, '_registrationTime') or force:
self._load()
return self._registrationTime
-
+
def editCount(self, force = False):
if not hasattr(self, '_editcount') or force:
self._load()
return self._editcount
-
+
def isBlocked(self, force = False):
if not self._blocked or force:
self._load()
return self._blocked
-
+
def groups(self, force = False):
if not self._groups or force:
self._load()
return self._groups
-
+
def getUserPage(self, subpage=''):
if self.name()[0] == '#':
#This user is probably being queried for purpose of lifting
@@ -135,15 +132,12 @@
self._load()
if not self._mailable:
raise UserActionRefuse("This user is not mailable")
-
if not self.site().isAllowed('sendemail'):
raise UserActionRefuse("You don't have permission to send
mail")
-
- if wikipedia.config.use_api and self.site().versionnumber() >= 14:
- pass # will handle NotImplementedError later
- else:
+
+ if not self.site().has_api() or self.site().versionnumber() < 14:
return self.sendMailOld(subject, text, ccMe)
-
+
params = {
'action': 'emailuser',
'target': self.name(),
@@ -153,22 +147,19 @@
}
if ccMe:
params['ccme'] = 1
-
result = query.GetData(params, self.site())
if 'error' in result:
code = result['error']['code']
if code == 'usermaildisabled ':
wikipedia.output("User mail has been disabled")
#elif code == '':
- #
-
+ #
elif 'emailuser' in result:
if result['emailuser']['result'] == 'Success':
wikipedia.output(u'Email sent.')
return True
-
return False
-
+
def sendMailOld(self, subject = u'', text = u'', ccMe = False):
addr = self.site().put_address('Special:EmailUser')
predata = {
@@ -179,11 +170,9 @@
}
if ccMe:
predata['wpCCMe'] = '1'
-
predata['wpEditToken'] = self.site().getToken()
-
- response, data = self.site().postForm(address, predata, sysop = False)
+ response, data = self.site().postForm(address, predata, sysop = False)
if data:
if 'var wgAction = "success";' in data:
wikipedia.output(u'Email sent.')
@@ -200,11 +189,12 @@
""" Yields pages that the user has edited, with an upper bound of
``limit''.
Pages returned are not guaranteed to be unique
(straight Special:Contributions parsing, in chunks of 500
items)."""
+ if not self.site().has_api():
+ raise NotImplementedError
# please stay this in comment until the regex is fixed
- #if wikipedia.config.use_api:
- #for pg, oldid, date, comment in self._ContributionsOld(limit):
- # yield pg, oldid, date, comment
- #return
+ # for pg, oldid, date, comment in self._ContributionsOld(limit):
+ # yield pg, oldid, date, comment
+ # return
params = {
'action': 'query',
@@ -218,7 +208,6 @@
params['uclimit'] = wikipedia.config.special_page_limit
if limit > 5000 and self.site().isAllowed('apihighlimits'):
params['uclimit'] = 5000
-
if namespace:
params['ucnamespace'] = namespace
# An user is likely to contribute on several pages,
@@ -250,29 +239,23 @@
#This user is probably being queried for purpose of lifting
#an autoblock, so has no contribs.
raise AutoblockUser
-
#
#TODO: fix contribRX regex
#
offset = 0
step = min(limit,500)
older_str = None
-
if self.site().versionnumber() <= 11:
older_str = self.site().mediawiki_message('sp-contributions-older')
else:
older_str = self.site().mediawiki_message('pager-older-n')
-
if older_str.startswith('{{PLURAL:$1'):
older_str = older_str[13:]
older_str = older_str[older_str.find('|')+1:]
older_str = older_str[:-2]
older_str = older_str.replace('$1',str(step))
-
address = self.site().contribs_address(self.name(),limit=step)
contribRX = re.compile(r'<li[^>]*> *<a
href="(?P<url>[^"]*?)"
title="[^"]+">(?P<date>[^<]+)</a>.*>%s</a>\)
*(<span class="[^"]+">[A-Za-z]</span>)* *<a
href="[^"]+" (class="[^"]+"
)?title="[^"]+">(?P<title>[^<]+)</a>
*(?P<comment>.*?)(?P<top><strong> *\(top\) *</strong>)? *(<span
class="mw-rollback-link">\[<a
href="[^"]+token=(?P<rollbackToken>[^"]+)%2B%5C".*%s</a>\]</span>)?
*</li>' %
(self.site().mediawiki_message('diff'),self.site().mediawiki_message('rollback')
) )
-
-
while offset < limit:
data = self.site().getUrl(address)
for pg in contribRX.finditer(data):
@@ -284,7 +267,6 @@
top = None
if pg.group('top'):
top = True
-
# top, new, minor, should all go in a flags field
yield wikipedia.Page(self.site(), pg.group('title')), oldid,
date, comment
@@ -296,19 +278,13 @@
address =
nextRX.group('address').replace('&','&')
else:
break
-
+
def uploadedImages(self, number = 10):
- try:
- if wikipedia.config.use_api and self.site().versionnumber() >= 11:
- apitest = self.site().api_address()
- del apitest
- else:
- raise NotImplementedError #No enable api or version not support
- except NotImplementedError:
+ if not self.site().has_api() or self.site().versionnumber() < 11:
for p,t,c,a in self._uploadedImagesOld(number):
yield p,t,c,a
return
-
+
params = {
'action': 'query',
'list': 'logevents',
@@ -322,29 +298,24 @@
for info in data['query']['logevents']:
count += 1
yield wikipedia.ImagePage(self.site(), info['title']),
info['timestamp'], info['comment'], False
-
+
if count >= number:
break
-
if 'query-continue' in data and count < number:
params['lestart'] =
data['query-continue']['logevents']['lestart']
else:
break
-
-
+
def _uploadedImagesOld(self, number = 10):
"""Yield ImagePages from
Special:Log&type=upload"""
regexp = re.compile('<li[^>]*>(?P<date>.+?)\s+<a
href=.*?>(?P<user>.+?)</a> .* uploaded "<a
href=".*?"(?P<new> class="new")?
title="(Image|File):(?P<image>.+?)"\s*>(?:.*?<span
class="comment">(?P<comment>.*?)</span>)?', re.UNICODE)
-
path = self.site().log_address(number, mode = 'upload', user =
self.name())
html = self.site().getUrl(path)
-
redlink_key = self.site().mediawiki_message('red-link-title')
redlink_tail_len = None
if redlink_key.startswith('$1 '):
redlink_tail_len = len(redlink_key[3:])
-
for m in regexp.finditer(html):
image = m.group('image')
deleted = False
@@ -355,7 +326,6 @@
date = m.group('date')
comment = m.group('comment') or ''
-
yield wikipedia.ImagePage(self.site(), image), date, comment, deleted
def block(self, expiry = None, reason = None, anon= True, noCreate = False,
@@ -384,24 +354,17 @@
raise AutoblockUser
if self.isBlocked() and not reBlock:
raise AlreadyBlocked()
-
+
self.site()._getActionUser('block', sysop=True)
-
if not expiry:
expiry = wikipedia.input(u'Please enter the expiry time for the
block:')
if not reason:
reason = wikipedia.input(u'Please enter a reason for the block:')
-
- try:
- if wikipedia.config.use_api and self.site().versionnumber() >= 12:
- x = self.site().api_address()
- del x
- else:
- raise NotImplementedError
- except NotImplementedError:
+
+ if not self.site().has_api() or self.site().versionnumber() < 12:
return self._blockOld(expiry, reason, anon, noCreate,
- onAutoblock, banMail, watchUser, allowUsertalk, reBlock)
-
+ onAutoblock, banMail, watchUser,
+ allowUsertalk, reBlock)
params = {
'action': 'block',
'user': self.name(),
@@ -425,7 +388,7 @@
params['reblock'] = 1
if allowUsertalk:
params['allowusertalk'] = 1
-
+
data = query.GetData(params, self.site(), sysop=True)
if 'error' in data: #error occured
errCode = data['error']['code']
@@ -441,7 +404,7 @@
raise BlockError("expiry time is the past")
elif errCode == 'cantblock-email':
raise BlockError("You don't have permission to ban mail")
-
+
elif 'block' in data: #success
return True
else:
@@ -461,16 +424,12 @@
#an autoblock, so can't be blocked.
raise AutoblockUser
sefl.site()._getActionUser('block', sysop=True)
-
if expiry is None:
expiry = input(u'Please enter the expiry time for the block:')
if reason is None:
reason = input(u'Please enter a reason for the block:')
-
token = self.site().getToken(self, sysop = True)
-
wikipedia.output(u"Blocking [[User:%s]]..." % self.name())
-
boolStr = ['0','1']
predata = {
'wpBlockAddress': self.name(),
@@ -485,10 +444,9 @@
'wpBlock': 'Block this user',
'wpEditToken': token
}
-
address = self.site().block_address()
- response, data = self.site().postForm(address, predata, sysop = True)
+ response, data = self.site().postForm(address, predata, sysop = True)
if data:
if
self.site().mediawiki_message('ipb_already_blocked').replace('$1',
self.name()) in data:
raise AlreadyBlockedError
@@ -508,26 +466,21 @@
blockID = self.name()[1:]
else:
blockID = self._getBlockID()
-
self._unblock(blockID,reason)
def _getBlockID(self):
wikipedia.output(u"Getting block id for [[User:%s]]..." % self.name())
-
address = self.site().blocksearch_address(self.name())
data = self.site().getUrl(address)
bIDre = re.search(r'action=unblock&id=(\d+)', data)
if not bIDre:
wikipedia.output(data)
raise BlockIDError
-
return bIDre.group(1)
def _unblock(self, blockID, reason):
wikipedia.output(u"Unblocking [[User:%s]]..." % self.name())
-
token = self.site().getToken(self, sysop = True)
-
predata = {
'id': blockID,
'wpUnblockReason': reason,
@@ -535,7 +488,6 @@
'wpEditToken': token,
}
address = self.site().unblock_address()
-
response, data = self.site().postForm(address, predata, sysop = True)
if response.code != 302:
if
self.site().mediawiki_message('ipb_cant_unblock').replace('$1',blockID) in
data:
@@ -573,13 +525,12 @@
self.throttle = throttle
self.force = force
self.sleeptime = 15
-
for user in users:
if not hasattr(user, '_editcount') or force:
self.users.append(user)
elif wikipedia.verbose:
wikipedia.output(u"BUGWARNING: %s already done!" %
user.name())
-
+
def run(self):
if self.users:
while True:
@@ -608,7 +559,7 @@
uj._mailable = ("emailable" in x)
uj._blocked = ('blockedby' in x)
#if self._blocked: #Get block ID
-
+
def getData(self):
users = {}
params = {
@@ -637,7 +588,6 @@
>>
wikipedia.output(exampleUser.getUserPage('Lipsum').get())
>> wikipedia.output(exampleUser.getUserTalkPage().get())
""")
-
# unit tests
import tests.test_userlib
import unittest