Commit feb7f63f authored by Mattia Rizzolo's avatar Mattia Rizzolo

update chanserver-mod to the latest version

parent fb2433fa
# Simple chanserv helper script for Xchat
# (c) 2006,2007 Dennis Kaarsemaker
# (c) 2006-2012 Dennis Kaarsemaker
#
# Latest version can be found on http://github.com/seveas/chanserv.py
#
# Latest version can be found on http://www.kaarsemaker.net/software/
#
# This script is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# version 3, as published by the Free Software Foundation.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Usage instructions:
# Place in ~/.xchat2 for it to be autoloaded
# - Place in ~/.xchat2 for it to be autoloaded
# - Use /py load chanserv.py if you already started X-chat
# - Connect to freenode, if not connected (other networks will not work)
#
# It adds one command to xchat: /cs
# /cs understands the following arguments
# o or op - Let chanserv op you/others (/cs op, /cs op somenick)
#
# To give/take ops/voice:
#
# o or op - Let chanserv op you/others (/cs op, /cs op nick)
# v or voice - Let chanserv give you/others voice
# d or deop - Let chanserv deop you/others (/cs deop, /cs deop somenick)
# dv or devoice - Let chanserv decoice you/others (/cs devoice, /cs devoice somenick)
# k or kick - Op, kick, deop (/cs kick somenick [comment])
# b or ban - Op, ban, deop (/cs ban somenick)
# f or forward - Ban a user with a forward (/cs forward nick chan)
# n or nameban - GECOS ban (/cs nameban nick)
# m or mute - Op, mute, deop (/cs mute somenick)
# kb or kickban - Op, kickban, deop (/cs kb somenick [comment])
# kf or kickforward - Same as kb, but with a forward (/cs kf somenick channel [comment])
# kn or kicknameban - Same as kb, but with a GECOS ban (/cs kn somenick [comment])
# l or lart - A combination of kick, ban, nameban, ident ban and nick ban.
# u or unban - Op, unban, deop (/cs u somenick)
# t or topic - Op, set topic, deop (/cs t New topic here)
# m or mode - Op, change channel mode, deop (/cs mode modes here)
# i or invite - With nick as argument: op, invite, deop. Without nick: invite to a channel
# bans - Show bans that apply to someone without removing them (/cs bans nick_or_mask)
# d or deop - Let chanserv deop you/others (/cs deop, /cs deop nick)
# dv or devoice - Let chanserv decoice you/others (/cs devoice, /cs devoice nick)
#
# To op yourself, perform an action, and deop:
#
# k or kick - Kick a user, possibly with comment (/cs kick nick [comment])
# b or ban - Ban a user (/cs ban [-nihar] nick)
# kb or kickban - Kick and ban a user (/cs ban [-nihar] nick)
# f or forward - Ban a user with a forward (/cs forward [-nihar] nick chan)
# kf or kickforward - Kickban a user with a forward (/cs forward [-nihar] nick chan)
# m or mute - Mute a user (/cs mute [-nihar] nick)
# l or lart - A combination of kick and ban on all fields
# u or unban - Remove all bans for a user (/cs u nick)
# t or topic - Set channel topic (/cs t New topic here)
# m or mode - Change channel mode (/cs mode modes here)
# i or invite - Invite yourself or someone else (/cs invite [nick])
# bans - Show bans that apply to someone without removing them (/cs bans nick)
#
# * For (kick)ban and mute, it will use the ip-address or hostname instead of
# the nickname, unless you give a complete mask as argument. This works even
# after a person left by using /whowas. /whowas generally works up to a few
# hours after someone left.
# * Bans, forwards and mute take an extra optional argument that specifies
# what should be banned: nickname, ident, host, account and/or realname.
# /cs ban -nah nick -- Ban nick, account and host
# /cs forward -nihra nick #somewhere -- Forward all
#
# * These commands also take an extra argument to specify when bans/mutes
# should be lifted automatically.
# /cs ban -t600 nick -- Ban nick for 10 minutes
# /cs ban -nah -t3600 -- Ban nick, account and hostname for an hour
#
# * Unban will remove all bans matching the nick or mask you give as argument
# (* and ? wildcards work)
# * It won't actually kick, but use the /remove command
# * Script is made to work on Freenode, may need changes to work on other
# networks
# * The -n argument before any of the commands (eg: /cs -n m foo) will make you
# stay opped after the act
#
# Apart from the /cs command it also adds automatic rejoining magic. When you
# are /remove'd from a channel, it will automatically rejoin (X-chat already can
# do that for you if kicked). When attempting to (re)join a channel which is
# invite-only, has a key set or where you are banned, the script will poke
# chanserv to let you in and will automatically let you in if ChanServ helps
#
# Changelog of latest versions
# 1.0: - Use xchat.get_info for getting the xchat dir
# - If a nick in /cs u starts with 2 asterisks, a non-looked up nameban
# removal will be tried. This was the last item in the todo, so this is
# 1.0
# 1.0.1: - Add voice/devoice
# 1.0.2: - Fix complete mask detection
# - Fixed downloader
# 1.0.3 - Added /cs bans -- determine which bans apply to someone without removing them
# 1.0.4 - Update download link and allow a comment in /cs lart
# 1.0.5 - Don't require a comment in /cs lart
# The following additional features are implemented
# - Autorejoin for /remove
# - Auto-unmute when muted
# - Auto-unban via chanserv
# - Auto-invite via chanserv
# - Auto-getkey via chanserv
__module_name__ = "chanserv"
__module_version__ = "1.0.5"
__module_version__ = "2.2.4"
__module_description__ = "Chanserv helper"
import collections
import xchat
import time
import re
import os
# Event queue
pending = []
# /whois cache
users = {}
# /mode =bd 'cache'
bans = {}
_bans = {}
# channel modes
modes = {}
intercept_akick = False
KICK, BAN, MUTE, KICKBAN, UNBAN, TOPIC, MODE, NAMEBAN, KICKNAMEBAN, LART, INVITE, BANS, XBAN = range(13)
# /mode bq 'cache'
bans = collections.defaultdict(list)
quiets = collections.defaultdict(list)
collecting_bans = []
abbreviations = {'kick': 'k', 'ban': 'b', 'kickban': 'kb', 'forward': 'f',
'kickforward': 'kf', 'mute': 'm', 'topic': 't', 'unban': 'u',
'mode': 'm', 'invite': 'i', 'op': 'o', 'deop': 'd', 'lart': 'l',
'voice': 'v', 'devoice': 'dv', 'bans': 'bans'}
expansions = dict([x[::-1] for x in abbreviations.items()])
simple_commands = ['op', 'deop', 'voice', 'devoice']
kick_commands = ['kick', 'kickforward', 'kickban', 'lart']
forward_commands = ['kickforward', 'forward']
ban_commands = ['ban', 'forward', 'mute', 'lart', 'kickban', 'kickforward']
simple_commands += [abbreviations[x] for x in simple_commands]
kick_commands += [abbreviations[x] for x in kick_commands]
ban_commands += [abbreviations[x] for x in ban_commands]
forward_commands += [abbreviations[x] for x in forward_commands]
all_commands = abbreviations.keys() + abbreviations.values()
ban_sentinel = '!'
debug = os.path.exists(os.path.join(xchat.get_info('xchatdir'), 'chanserv.py-debug'))
# Main /cs command
def cs(word, word_eol, userdata):
chan = xchat.get_info('channel')
me = xchat.get_info('nick')
ctx = xchat.get_context()
deop = True
"""Main command dispatcher"""
if len(word) == 1:
return xchat.EAT_ALL
command = word[1].lower()
if command not in all_commands:
return xchat.EAT_NONE
args = dict(enumerate(word_eol[2:]))
me = xchat.get_info('nick')
action = Action(channel = xchat.get_info('channel'),
me = me,
context = xchat.get_context())
# The simple ones: op/voice
if command in simple_commands:
action.target = args.get(0, me)
action.deop = (action.target != me)
action.needs_op = False
command = expansions.get(command,command)
action.actions.append('chanserv %s %%(channel)s %%(target_nick)s' % command)
return action.schedule()
# Usage check
if len(word) < 3:
if command in all_commands:
xchat.emit_print("Server Error", "Not enough arguments for %s" % command)
return xchat.EAT_ALL
return xchat.EAT_NONE
if command in ('t','topic'):
action.actions.append('chanserv TOPIC %%(channel)s %s' % args[0])
action.needs_op = False
return action.schedule()
if command in ('m','mode') and args[0][0] in '+=-':
action.actions.append('MODE %%(channel)s %s' % args[0])
return action.schedule()
if command in ('i','invite'):
target = args[0]
if target.startswith('#'):
action.needs_op = False
action.actions.append('chanserv INVITE %s' % target)
else:
if target.lower() in [x.nick.lower() for x in action.context.get_list('users')]:
xchat.emit_print("Server Error", "%s is already in %s" % (target, action.channel))
return xchat.EAT_ALL
action.actions.append('INVITE %s %%(channel)s' % target)
return action.schedule()
if word[1] == '-n':
deop = False
word.remove('-n')
for w in word_eol:
if w.strip().startswith('-n'):
word_eol.remove(w)
break
comm = word[1].lower()
if comm in ['o','op']:
word_eol.append('')
if me in word_eol[2] or word_eol[2] == '':
for p in pending:
if p.channel == chan:
p.deop = False
xchat.command('chanserv OP %s %s' % (chan, word_eol[2]))
# Kick/ban/forward/mute handling
if len(word) < 4 and command in forward_commands:
xchat.emit_print("Server Error", "Not enough arguments for %s" % command)
return xchat.EAT_ALL
if comm in ['d','deop']:
if len(word) < 3: word.append(me)
if me in word[2:]:
for p in pending:
if p.channel == chan:
p.deop = True
xchat.command('chanserv OP %s %s' % (chan, ' '.join(map(lambda x: '-'+x, word[2:]))))
return xchat.EAT_ALL
# Command dispatch
# Check for -nihra argument
if command in ban_commands:
args_start = 3
while args[0].startswith('-'):
if args[0].startswith('-t'):
try:
action.timer = int(args[0][2:].split(None, 1)[0])
except ValueError:
pass
else:
action.bans = args[0][1:].split(None, 1)[0]
args = dict(enumerate(word_eol[args_start:]))
args_start += 1
if command in ('lart','l'):
action.bans = 'nihra'
if comm in ['v','voice']:
word_eol.append('')
xchat.command('chanserv VOICE %s %s' % (chan, word_eol[2]))
return xchat.EAT_ALL
# Set target
action.target = args[0].split(None,1)[0]
if comm in ['dv','devoice']:
if len(word) < 3: word.append(me)
xchat.command('chanserv VOICE %s %s' % (chan, ' '.join(map(lambda x: '-'+x, word[2:]))))
if not valid_nickname(action.target) and not valid_mask(action.target):
xchat.emit_print("Server Error", "Invalid target: %s" % action.target)
return xchat.EAT_ALL
if comm in ['k','kick']:
if len(word) < 3: return xchat.EAT_ALL
if len(word) < 4: word_eol.append('')
if word[2].lower() not in [x.nick.lower() for x in ctx.get_list('users')]:
xchat.emit_print("Server Error", "%s is not in %s" % (word[2],ctx.get_info('channel')))
return xchat.EAT_ALL
schedule(Action(ctx, KICK, word[2], word_eol[3]), deop)
return xchat.EAT_ALL
if comm in ['b','ban']:
if len(word) < 3: return xchat.EAT_ALL
schedule(Action(ctx, BAN, word[2]), deop)
if action.bans and not valid_nickname(action.target):
xchat.emit_print("Server Error", "Ban types and lart can only be used with nicks, not with complete masks")
return xchat.EAT_ALL
if comm in ['x','xban']:
if len(word) < 3: return xchat.EAT_ALL
schedule(Action(ctx, XBAN, word[2]), deop)
return xchat.EAT_ALL
if valid_mask(action.target):
action.bans = 'f'
if comm in ['n','nameban']:
if len(word) < 3: return xchat.EAT_ALL
schedule(Action(ctx, NAMEBAN, word[2]), deop)
return xchat.EAT_ALL
if not action.bans:
action.bans = 'h'
if comm in ['f','forward']:
if len(word) < 4: return xchat.EAT_ALL
if word[3][0] != '#':
xchat.emit_print("Server Error", "You can only forward to a channel");
# Find forward channel
if command in forward_commands:
action.forward_to = '$' + args[1].split(None,1)[0] # Kludge
if not valid_channel(action.forward_to[1:]):
xchat.emit_print("Server Error", "Invalid channel: %s" % action.forward_to[1:])
return xchat.EAT_ALL
schedule(Action(ctx, BAN, word[2], forward_channel=word[3]), deop)
return xchat.EAT_ALL
if comm in ['m','mute']:
if len(word) < 3: return xchat.EAT_ALL
if word[2][0] not in "+-=":
schedule(Action(ctx, MUTE, word[2]), deop)
# Check if target is there and schedule kick
if command in kick_commands:
if action.target.lower() not in [x.nick.lower() for x in action.context.get_list('users')]:
xchat.emit_print("Server Error", "%s is not in %s" % (action.target, action.channel))
return xchat.EAT_ALL
action.reason = args.get(1, 'Goodbye')
action.actions.append('remove %(channel)s %(target_nick)s :%(reason)s')
if comm in ['kb','kickban']:
if len(word) < 3: return xchat.EAT_ALL
if len(word) < 4: word_eol.append('')
schedule(Action(ctx, KICKBAN, word[2], word_eol[3]), deop)
return xchat.EAT_ALL
if command in ('m','mute'):
action.banmode = 'q'
if comm in ['kn','kicknameban']:
if len(word) < 3: return xchat.EAT_ALL
if len(word) < 4: word_eol.append('')
schedule(Action(ctx, KICKNAMEBAN, word[2], word_eol[3]), deop)
return xchat.EAT_ALL
if command in ban_commands:
action.do_ban = True
if 'n' in action.bans: action.actions.append('mode %(channel)s +%(banmode)s %(target_nick)s!*@*%(forward_to)s')
if 'i' in action.bans: action.actions.append('mode %(channel)s +%(banmode)s *!%(target_ident)s@*%(forward_to)s')
if 'h' in action.bans: action.actions.append('mode %(channel)s +%(banmode)s *!*@%(target_host)s%(forward_to)s')
if 'r' in action.bans: action.actions.append('mode %(channel)s +%(banmode)s $r:%(target_name_bannable)s%(forward_to)s')
if 'a' in action.bans: action.actions.append('mode %(channel)s +%(banmode)s $a:%(target_account)s%(forward_to)s')
if 'f' in action.bans: action.actions.append('mode %(channel)s +%(banmode)s %(target)s%(forward_to)s')
if comm in ['kf','kickforward']:
if len(word) < 4: return xchat.EAT_ALL
if word[3][0] != '#':
xchat.emit_print("Server Error", "You can only forward to a channel");
if command in ('u','unban'):
action.do_unban = True
if command == 'bans':
action.do_bans = True
action.needs_op = False
return action.schedule()
xchat.hook_command('cs',cs,"For help with /cs, please read the comments in the script")
class Action(object):
"""A list of actions to do, and information needed for them"""
def __init__(self, channel, me, context):
self.channel = channel
self.me = me
self.context = context
self.stamp = time.time()
# Defaults
self.deop = True
self.needs_op = True
self.do_ban = self.do_unban = self.do_bans = False
self.banmode = 'b'
self.reason = ''
self.bans = ''
self.actions = []
self.resolved = True
self.target = ''
self.forward_to = ''
self.timer = 0
def __str__(self):
ctx = {'channel': self.channel, 'target': self.target}
if hasattr(self, 'target_ident'):
ctx['target'] = '%s!%s@%s (r: %s a: %s)' % (self.target_nick, self.target_ident, self.target_host, self.target_name, self.target_account)
ctx['actions'] = ' | '.join(self.actions)
return "C: %(channel)s T: %(target)s A: %(actions)s" % ctx
def schedule(self, update_stamp=False):
"""Request information and add ourselves to the queue"""
if debug:
xchat.emit_print('Server Text', "Scheduling " + str(self))
if update_stamp:
self.stamp = time.time()
pending.append(self)
# Am I opped?
self.am_op = False
for user in self.context.get_list('users'):
if user.nick == self.me and user.prefix == '@':
self.am_op = True
self.deop = False
if self.needs_op and not self.am_op:
self.context.command("chanserv op %s" % self.channel)
# Find needed information
if ('a' in self.bans or 'r' in self.bans) and valid_mask(self.target) and not self.target.startswith('$'):
xchat.emit_print('Server Error', "Invalid argument %s for account/realname ban" % self.target)
return xchat.EAT_ALL
if len(word) < 5: word_eol.append('')
schedule(Action(ctx, KICKBAN, word[2], word_eol[4], word[3]), deop)
return xchat.EAT_ALL
if comm in ['u','unban']:
if len(word) < 3: return xchat.EAT_ALL
if word[2].startswith('**'):
schedule(Action(ctx, UNBAN, word_eol[2]), deop)
if self.do_ban or self.do_unban or self.do_bans:
self.resolve_nick()
else:
schedule(Action(ctx, UNBAN, word[2]), deop)
return xchat.EAT_ALL
self.target_nick = self.target
if comm in ['l','lart']:
if len(word) < 3: return xchat.EAT_ALL
if len(word_eol) < 4: word_eol.append('')
schedule(Action(ctx, LART, word[2], word_eol[3]), deop)
return xchat.EAT_ALL
if self.do_unban or self.do_bans:
self.fetch_bans()
if comm in ['t','topic']:
if len(word) < 3: return xchat.EAT_ALL
schedule(Action(ctx, TOPIC, word_eol[2]), deop)
run_pending()
return xchat.EAT_ALL
if comm in ['m','mode']:
if len(word) < 3: return xchat.EAT_ALL
schedule(Action(ctx, MODE, word_eol[2]), deop)
return xchat.EAT_ALL
def resolve_nick(self, request=True):
"""Try to find nickname, ident and host"""
self.target_nick = None
self.target_ident = None
self.target_host = None
self.target_name = None
self.target_account = None
self.resolved = False
if valid_mask(self.target):
if self.target.startswith('$a:'):
self.target_account = self.target[3:]
elif self.target.startswith('$r:'):
self.target_name = self.target[3:]
else:
self.target_nick, self.target_mask, self.target_host = re.split('[!@]', self.target)
self.resolved = True
return
if comm in ['i','invite']:
if len(word) < 3: return xchat.EAT_ALL
if word[2][0] == '#':
xchat.command('chanserv INVITE %s' % (word[2]))
self.target_nick = self.target.lower()
if self.target_nick in users:
if users[self.target_nick].time < time.time() - 10:
del users[self.target_nick]
if request:
self.context.command('whois %s' % self.target_nick)
else:
self.target_ident = users[self.target_nick].ident
self.target_host = users[self.target_nick].host
self.target_name = users[self.target_nick].name
self.target_name_bannable = re.sub('[^a-zA-Z0-9]', '?', self.target_name)
self.target_account = users[self.target_nick].account
self.resolved = True
if 'gateway/' in self.target_host and self.bans == 'h' and self.do_ban:
# For gateway/* users, default to ident ban
self.actions.append('mode %(channel)s +%(banmode)s *!%(target_ident)s@gateway/*%(forward_to)s')
self.actions.remove('mode %(channel)s +%(banmode)s *!*@%(target_host)s%(forward_to)s')
else:
if word[2].lower() in [x.nick.lower() for x in ctx.get_list('users')]:
xchat.emit_print("Server Error", "%s is already in %s" % (word[2],ctx.get_info('channel')))
return xchat.EAT_ALL
schedule(Action(ctx, INVITE, word_eol[2]), deop)
return xchat.EAT_ALL
if request:
self.context.command('whois %s' % self.target_nick)
if comm == 'bans':
if len(word) < 3: word.append(me)
schedule(Action( ctx, BANS, word[2]), deop)
return xchat.EAT_ALL
if comm in ['update']:
import thread
thread.start_new_thread(download,tuple([]))
return xchat.EAT_ALL
# /cs is an alias for chanserv too, so don't eat anything if we're not able
# to fulfill the request
return xchat.EAT_NONE
xchat.hook_command('cs',cs,"For help with /cs, please read the comments in the script")
def fetch_bans(self):
"""Read bans for a channel"""
bans[self.channel] = []
quiets[self.channel] = []
collecting_bans.append(self.channel)
self.context.command("mode %s +bq" % self.channel)
# Action class, quite powerful and extendable
class Action:
def __init__(self, ctx, typ, arg, comment='', forward_channel=''):
self.ctx = ctx
self.typ = typ
self.arg = arg
self.nick = arg.lower()
self.comment = comment
self.forward_channel = forward_channel
self.completemask = False
self.realname = ''
self.mask = None
if typ in [MUTE, BAN, XBAN, KICKBAN, UNBAN, BANS]:
if self.nick.startswith('**'):
self.realname = self.nick[2:]
self.nick = ''
self.mask = ('','')
if '!' in self.nick and '@' in self.nick and self.nick.find('!') < self.nick.find('@'):
self.nick, self.mask = self.nick.split('!',1)
self.completemask = True
if '@' in self.mask:
self.mask = list(self.mask.split('@',1))
self.channel = ctx.get_info('channel')
self.stamp = time.time()
def run(self):
# Now perform actions
if self.typ == TOPIC:
self.ctx.command("TOPIC %s" % self.arg)
if self.typ == MODE:
self.ctx.command("MODE %s %s" % (self.channel, self.arg))
if self.typ == INVITE:
self.ctx.command("INVITE %s" % (self.arg))
if self.typ == UNBAN:
"""Perform our actions"""
if debug:
xchat.emit_print('Server Text', "Running " + str(self))
kwargs = dict(self.__dict__.items())
if self.do_bans:
xchat.emit_print('Server Text', "Bans matching %s!%s@%s (r:%s, a:%s)" %
(self.target_nick, self.target_ident, self.target_host, self.target_name, self.target_account))
if self.do_unban or self.do_bans:
for b in bans[self.channel]:
if self.match(b):
if '!' in b and '@' in b:
self.ctx.command("MODE %s -b %s" % (self.channel, b))
if self.do_bans:
xchat.emit_print('Server Text', b)
else:
self.ctx.command("MODE %s -d :%s" % (self.channel, b))
self.actions.append('mode %s -b %s' % (self.channel, b))
if self.typ == BANS:
xchat.emit_print('Server Text', "Bans matching %s!%s@%s (%s)" % (self.nick, self.mask[0], self.mask[1], self.realname))
for b in bans[self.channel]:
for b in quiets[self.channel]:
if self.match(b):
xchat.emit_print('Server Text', b)
if self.typ in [KICK, KICKBAN, KICKNAMEBAN, LART]:
self.ctx.command("REMOVE %s %s :%s" % (self.channel, self.nick, self.comment))
if self.typ in [BAN, KICKBAN, MUTE, LART]:
mode = 'b'
if self.forward_channel:
self.mask[1] += '!' + self.forward_channel
if self.typ == MUTE:
mode = 'q'
if self.completemask:
self.ctx.command("MODE %s +%s %s!%s@%s" % (self.channel, mode, self.nick, self.mask[0], self.mask[1]))
else:
self.ctx.command("MODE %s +%s *!*@%s" % (self.channel, mode, self.mask[1]))
if self.typ == XBAN:
mode = 'b'
self.ctx.command("MODE %s +%s $x:*%s*" % (self.channel, mode, self.nick))
if self.do_bans:
xchat.emit_print('Server Text', b + ' (quiet)')
else:
self.actions.append('mode %s -q %s' % (self.channel, b))
# Perform all registered actions
for action in self.actions:
if '%(target_account)s' in action and not self.target_account:
xchat.emit_print('Server Text', "Can't do an account ban for %s, not identified" % self.target_nick)
continue
self.context.command(action % kwargs)
self.done()
def done(self):
"""Finaliazation and cleanup"""
# Done!
if debug:
xchat.emit_print('Server Text', "Done " + str(self))
pending.remove(self)
# Deop?
if not self.am_op or not self.needs_op:
return
if self.typ in [NAMEBAN, KICKNAMEBAN, LART]:
self.ctx.command("MODE %s +d %s" % (self.channel, self.realname.replace(' ','?')))
for p in pending:
if p.channel == self.channel and p.needs_op or not p.deop:
self.deop = False
break
if self.typ == LART:
# Still todo: ident ban and nick ban
self.ctx.command("MODE %s +b %s!*@*" % (self.channel, self.nick))
self.ctx.command("MODE %s +b *!%s@*" % (self.channel, self.mask[0]))
if self.deop:
self.context.command("chanserv deop %s" % self.channel)
# Schedule removal?
if self.timer:
action = Action(self.channel, self.me, self.context)
action.deop = self.deop
action.actions = [x.replace('+','-',1) for x in self.actions]
action.target = action.target_nick = self.target_nick
action.target_ident = self.target_ident
action.target_host = self.target_host
action.target_name = self.target_name
action.target_name_bannable = self.target_name_bannable
action.target_account = self.target_account
action.resolved = True
action.banmode = self.banmode
action.needs_op = True
xchat.hook_timer(self.timer * 1000, lambda act: act.schedule(update_stamp=True) and False, action)
def match(self, ban):
if '!' in ban and '@' in ban: # Not 100% reliable but it'll do
try:
nick, host = ban.split('!')[:2] # Trim !#foo channel forward
ident, host = host.split('@')
except:
# If this happens, the ban is invalid and we should remove it anyway
return True
if nick[0] == '%':
nick = nick[1:]
for mtch, me in [(nick, self.nick), (ident, self.mask[0]), (host, self.mask[1])]:
mtch = '^%s$' % re.escape(mtch).replace(r'\*','.*').replace(r'\?','.')
if not re.match(mtch,me,re.I):
return False
return True
mtch = '^%s$' % re.escape(ban).replace(r'\*','.*').replace(r'\?','.')
if not re.match(mtch,self.realname,re.I):
return False
return True
def n2a(self,request=False):
if self.nick in users:
if users[self.nick][0]:
if users[self.nick][3] > time.time() - 10:
self.mask = list(users[self.nick][0:2])
self.realname = users[self.nick][2]
return
if request:
self.ctx.command('whois %s' % self.nick)
self.mask = None
def schedule(event, deop):
# Add event to the pending queue and make sure all neccessary commands are
# issued. Don't op if not sure the nick is there
pending.append(event)
# Am I op?
for user in event.ctx.get_list('users'):
if user.nick == event.ctx.get_info('nick') and user.prefix == '@':
event.am_op = True
break
else:
event.am_op = False
# Deop afterwards?
event.deop = deop
if event.deop:
event.deop = not event.am_op
for p in pending:
if p.channel == event.channel and p.deop:
event.deop = True
# Do I know the nick
if event.typ in (BAN, XBAN, KICKBAN, MUTE, UNBAN, BANS) and not event.mask:
event.n2a(request=True)
if event.typ in (NAMEBAN, KICKNAMEBAN) and not event.realname:
event.n2a(request=True)
if event.typ == LART and (not event.mask or not event.realname):
event.n2a(request=True)
# Do I have all bans
if event.typ in [UNBAN, BANS] and event.channel not in bans:
_bans[event.channel] = []
event.ctx.command("MODE %s =bd" % event.channel)
# Do I have all modes
if event.typ in [TOPIC, MODE] and event.channel not in modes:
event.ctx.command("MODE %s" % event.channel)
run_pending()
"""Does a ban match this action"""
if ban.startswith('$r:') and self.target_name:
return ban2re(ban[3:]).match(self.target_name)
elif ban.startswith('$a:') and self.target_account:
return ban2re(ban[3:]).match(self.target_account)
else:
if '#' in ban:
ban = ban[:ban.find('$#')]
return ban2re(ban).match('%s!%s@%s' % (self.target_nick, self.target_ident, self.target_host))
def run_pending(just_opped = None):
"""Check all actions and run them if all information is there"""
now = time.time()
for p in pending:
if p.channel == just_opped:
p.am_op = True
if p.target_nick in users and not p.resolved:
p.resolve_nick(request = False)
# Timeout?
if p.stamp < time.time() - 10:
if p.deop and len([x for x in pending if x.channel == p.channel]) == 0:
p.ctx.command('chanserv OP %s -%s' % (p.channel, p.ctx.get_info('nick')))
pending.remove(p)
if p.stamp < now - 10:
p.done()
continue
if p.channel == just_opped:
p.am_op = True
if p.typ in (BAN, XBAN, KICKBAN, MUTE, UNBAN, LART, BANS) and not p.mask:
p.n2a()
if p.typ in (NAMEBAN, KICKNAMEBAN, LART) and not p.realname:
p.n2a()
# Run!
# Mode check here! TODO
if p.typ == MODE and p.channel in modes:
if not modes[p.channel].would_change([x for x in p.arg.split() if x]):
pending.remove(p)
return
if (p.typ in (BAN, XBAN, KICKBAN, MUTE) and p.mask) or \
(p.typ in (NAMEBAN, KICKNAMEBAN) and p.realname) or \
(p.typ in (UNBAN,BANS) and p.channel in bans and p.mask) or \
(p.typ in (MODE, TOPIC) and p.channel in modes) or \
(p.typ == LART and p.realname and p.mask) or \
p.typ in (KICK,INVITE):
if p.am_op or (p.typ == TOPIC and 't' not in modes[p.channel].modeset) or (p.typ == BANS):
p.run()
pending.remove(p)
if p.typ in (UNBAN,BANS) and len([x for x in pending if x.channel == p.channel and x.typ in (UNBAN,BANS)]) == 0:
bans.pop(p.channel)
if p.deop and len([x for x in pending if x.channel == p.channel]) == 0:
p.ctx.command('chanserv OP %s -%s' % (p.channel, p.ctx.get_info('nick')))
else:
p.ctx.command('chanserv OP %s %s' % (p.channel, p.ctx.get_info('nick')))
# Run commands after chanserv ops
can_run = not (p.channel in collecting_bans and (p.do_unban or p.do_bans))
if can_run and p.resolved and (p.am_op or not p.needs_op):
p.run()
# Helper functions
def ban2re(data):
return re.compile('^' + re.escape(data).replace(r'\*','.*').replace(r'\?','.') + '$')
_valid_nickname = re.compile(r'^[-a-zA-Z0-9\[\]{}`|_^\\]{0,30}$')
valid_nickname = lambda data: _valid_nickname.match(data)
_valid_channel = re.compile(r'^[#~].*') # OK, this is cheating
valid_channel = lambda data: _valid_channel.match(data)
_valid_mask = re.compile(r'^([-a-zA-Z0-9\[\]{}`|_^\\*?]{0,30}!.*?@.*?|\$[ar]:.*)$')
valid_mask = lambda data: _valid_mask.match(data)
# Data processing
def do_mode(word, word_eol, userdata):
"""Run pending actions when chanserv opped us"""
ctx = xchat.get_context()
if 'chanserv!' in word[0].lower() and '+o' in word[3] and ctx.get_info('nick') in word:
run_pending(just_opped = ctx.get_info('channel'))
xchat.hook_server('MODE', do_mode)
# Run commands after /whois returns data
class User(object):
def __init__(self, nick, ident, host, name):
self.nick = nick; self.ident = ident; self.host = host; self.name = name
self.account = None
self.time = time.time()
def do_whois(word, word_eol, userdata):
users[word[3].lower()] = (word[4], word[5], word_eol[7][1:], time.time())
run_pending()
"""Store whois replies in global cache"""
nick = word[3].lower()
if word[1] == '330':
users[nick].account = word[4]
else:
users[nick] = User(nick, word[4], word[5], word_eol[7][1:])
xchat.hook_server('311', do_whois)
xchat.hook_server('330', do_whois)
xchat.hook_server('314', do_whois) # This actually is a /whowas reply
# Do /whowas is /whois fails
def do_missing(word, word_eol, userdata):
"""Fall back to whowas if whois fails"""
for p in pending:
if p.nick == word[3]:
p.ctx.command('whowas %s' % word[3])
if p.target == word[3]:
p.context.command('whowas %s' % word[3])
break
xchat.hook_server('401', do_missing)
# Display an error if /whowas also fails
def do_endwas(word, word_eol, userdata):
"""Display error if nickname cannot be resolved"""
for p in pending:
if p.nick == word[3]:
xchat.emit_print("Server Error", "%s could not be found" % p.nick)
if p.target == word[3]:
xchat.emit_print("Server Error", "%s could not be found" % p.target)
pending.remove(p)
xchat.hook_server('406', do_endwas)
# Add ban data tot cache (reply of /mode =b)
def endofwhois(word, word_eol, userdata):
"""Process the queue after nickname resolution"""
run_pending()
xchat.hook_server('318', endofwhois)
xchat.hook_server('369', endofwhois)
xchat.hook_server('482', lambda word, word_eol, userdata: xchat.emit_print('Server Error', '%s in %s' % (word_eol[4][1:], word[3])))
def do_ban(word, word_eol, userdata):
if word[3] in _bans:
_bans[word[3]].append(word[4])
"""Process banlists"""
channel, ban = word[3:5]
if channel in collecting_bans:
bans[channel].append(ban)
return xchat.EAT_ALL
return xchat.EAT_NONE
xchat.hook_server('367', do_ban)
# Run commands after all bans are shown
# It does mode =bd, so 2 368 have to be received before any action is taken
MARKER = '@@@@@@' # This is invalid as ban, so is safe, the +b bans come first
MARKER2 = '!!!!!!'
def do_quiet(word, word_eol, userdata):
"""Process banlists"""
channel, ban = word[3], word[5]
if channel in collecting_bans:
quiets[channel].append(ban)
return xchat.EAT_ALL
return xchat.EAT_NONE
xchat.hook_server('728', do_quiet)
def do_endban(word, word_eol, userdata):
if word[3] in _bans:
if MARKER in _bans[word[3]]:
_bans[word[3]].remove(MARKER)
bans[word[3]] = _bans[word[3]]
del(_bans[word[3]])
run_pending()
else:
_bans[word[3]].append(MARKER)
"""Process end-of-ban markers"""
channel = word[3]
if channel in collecting_bans:
return xchat.EAT_ALL
return xchat.EAT_NONE
xchat.hook_server('368', do_endban)
# Autorejoin on /remove and /kick
def do_endquiet(word, word_eol, userdata):
"""Process end-of-quiet markers"""
channel = word[3]
if channel in collecting_bans:
collecting_bans.remove(channel)
run_pending()
return xchat.EAT_ALL
return xchat.EAT_NONE
xchat.hook_server('729', do_endquiet)
# Turn on autorejoin
xchat.command('SET -quiet irc_auto_rejoin ON')
def rejoin(word, word_eol, userdata):
if word[0][1:word[0].find('!')] == xchat.get_info('nick') and word[3][1:].lower() == 'requested':
"""Rejoin when /remove'd"""
if word[0][1:word[0].find('!')] == xchat.get_info('nick') and len(word) > 3 and word[3][1:].lower() == 'requested':
xchat.command('join %s' % word[2])
xchat.hook_server('PART', rejoin)
# Try to convince chanserv to let me in
def letmein(word, word_eol, userdata):
if word[1] == '473': xchat.command('quote cs invite %s' % word[3])
elif word[1] == '474': xchat.command('quote cs unban %s' % word[3])
elif word[1] == '475': xchat.command('quote cs getkey %s' % word[3])
xchat.hook_server('473', letmein) # +i
xchat.hook_server('474', letmein) # +b
xchat.hook_server('475', letmein) # +k
def unmute(word, word_eol, userdata):
xchat.command('cs unban %s' % xchat.get_info('nick'))
xchat.hook_server('404', unmute)
class ModeSet:
def __init__(self, raw_modes=[], modeset={}, new=False):
self.modeset = dict(modeset) # Always copy
if raw_modes:
self.merge(raw_modes)
self.new = new
def merge(self, raw_modes):
newmodes = raw_modes[0]; args = raw_modes[1:]
num_plusmin = newmodes.count('+') + newmodes.count('-')
do_set = True
if len(args) > len(newmodes) - num_plusmin or newmodes[0] not in '+-':
xchat.emit_print('Server Error', 'Woah mitzy, be careful with your modes :)')
else:
for i in range(len(newmodes)):
m = newmodes[i]
if m == '+': do_set = True; num_plusmin -= 1
elif m == '-': do_set = False; num_plusmin -= 1
elif m.isalnum():
if m in 'vohbdqa':
continue
argp = len(newmodes) - i - len(args) - num_plusmin
if argp == 0:
self.modeset[m] = args[0]
args = args[1:]
else:
if do_set:
self.modeset[m] = True
elif m in self.modeset:
self.modeset.pop(m)
else:
xchat.emit_print('Server Error', 'Whoah mitzy, be careful with your modes :)')
def would_change(self, raw_modes):
# Always return true for operator/voice etc... changes
for m in raw_modes[0]:
if m in 'vohbdqa':
return True
return str(ModeSet(raw_modes, self.modeset)) != str(self)
# Unban when muted
xchat.hook_server('404', lambda word, word_eol, userdata: xchat.command('quote cs unban %s' % word[3]))
def __str__(self):
return '{' + ','.join(['%s:%s' % (x, self.modeset[x]) for x in sorted(self.modeset.keys())]) + '}'
# Convince chanserv to let me in when key/unban/invite is needed
xchat.hook_server('471', lambda word, word_eol, userdata: xchat.command('quote cs invite %s' % word[3])) # 471 = limit reached
xchat.hook_server('473', lambda word, word_eol, userdata: xchat.command('quote cs invite %s' % word[3]))
xchat.hook_server('474', lambda word, word_eol, userdata: xchat.command('quote cs unban %s' % word[3]))
xchat.hook_server('475', lambda word, word_eol, userdata: xchat.command('quote cs getkey %s' % word[3]))
def do_mode2(word, word_eol, userdata):
if userdata:
ret = xchat.EAT_NONE
c = word[3]
if c[0] != '#':
return
if c not in modes:
ret = xchat.EAT_ALL
modes[c] = ModeSet(word[4:],new=c not in modes)
run_pending()
return ret
else:
# Let's be lazy here
c = word[2]
if c[0] != '#': return
if c not in modes:
xchat.command("MODE %s" % c)
else:
modes[c].merge(word[3:])
xchat.hook_server('324',do_mode2, True)
xchat.hook_server('MODE',do_mode2, False)
def do_time(word, word_eol, userdata):
if word[3] in modes and modes[word[3]].new:
modes[word[3]].new = False
return xchat.EAT_ALL
xchat.hook_server('329',do_time, True)
def joincb(word, word_eol, userdata):
modes[word[1]] = ModeSet()
xchat.hook_command('join', joincb, priority=xchat.PRI_HIGHEST)
# Did chanserv let me in? - This function is now misnamed as it's used for akick as well
def join(word, word_eol, userdata):
global intercept_akick
def on_invite(word, word_eol, userdata):
"""Autojoin when chanserv invites us"""
if word[0] == ':ChanServ!ChanServ@services.':
if word[1] == 'INVITE': xchat.command('JOIN %s' % word[-1][1:])
if 'have been cleared' in word_eol[0]: xchat.command('JOIN %s' %word[-1])
if 'key is' in word_eol[0]: xchat.command('JOIN %s %s' % (word[4][2:-2], word[-1][2:-2]))
# Work around xchat stupidness by always writing chanserv notices to the
# current context
# Intercept akick lists if needed
if intercept_akick:
if 'AutoRemove' in word_eol[0] or 'Num Hostmask' in word_eol[0] or '--- --------' in word_eol[0]:
return xchat.EAT_ALL
if '-- End of list --' in word_eol[0]:
intercept_akick=False
# FIXME Remove marker
run_pending()
else:
# FIXME Parse hostmask and add to bans
pass
if word[1] == 'NOTICE':
xchat.emit_print("Notice", 'ChanServ', word_eol[3][2:])
return xchat.EAT_ALL
xchat.hook_server('NOTICE', join)
xchat.hook_server('INVITE', join)
def download():
import urllib2, os
xchat.emit_print('Server Text','Trying to download chanserv.py from kaarsemaker.net')
try:
fd = open(os.path.join(xchat.get_info('xchatdir'),'chanserv.py'))
old_cs = fd.read()
fd.close()
fd = urllib2.urlopen('http://media.kaarsemaker.net/chanserv.py')
new_cs = fd.read()
fd.close()
if old_cs == new_cs:
xchat.emit_print('Server Text','No new version of chanserv.py is available')
return
# Basic sanity check
if 'Seveas' in new_cs and 'chanserv.py' in new_cs and 'xchat.hook_server' in new_cs:
fd2 = open(os.path.join(xchat.get_info('xchatdir'),'chanserv.py'),'w')
fd2.write(new_cs)
fd2.close()
xchat.emit_print('Server Text','chanserv.py updated -- reload with /py reload chanserv.py')
else:
xchat.emit_print('Server Error','Downloading chanserv.py failed - downloaded file not correct')
except:
xchat.emit_print('Server Error','Failed to update chanserv.py')
xchat.command('join %s' % word[-1][1:])
xchat.hook_server('INVITE', on_invite)
def on_notice(word, word_eol, userdata):
"""Autojoin when chanserv unbans us or sent us a key"""
if word[0] != ':ChanServ!ChanServ@services.':
return
if 'Unbanned' in word_eol[0]:
xchat.command('JOIN %s' % word[6].strip()[1:-1])
if 'key is' in word_eol[0]:
xchat.command('JOIN %s %s' % (word[4][1:-1], word[-1]))
xchat.hook_server('NOTICE', on_notice)
# Spam!
xchat.emit_print('Server Text',"Loaded %s %s by Seveas <dennis@kaarsemaker.net>" % (__module_description__, __module_version__))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment