/
main.py
134 lines (116 loc) · 4.82 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import asyncio
import ssl
import logging
import yaml
logging.basicConfig(level=logging.INFO)
class IRCBot:
def __init__(
self,
server,
port,
nickname,
channel,
network_identifier,
relay_bot=None,
tls=True,
ignored_users=None,
):
self.server = server
self.port = port
self.nickname = nickname
self.channel = channel
self.network_identifier = network_identifier
self.relay_bot = relay_bot
self.reader = None
self.writer = None
self.tls = tls
self.ignored_users = ignored_users if ignored_users else []
async def connect(self):
if self.tls:
self.reader, self.writer = await asyncio.open_connection(
self.server, self.port, ssl=ssl.create_default_context()
)
else:
self.reader, self.writer = await asyncio.open_connection(
self.server, self.port
)
self.writer.write(f'NICK {self.nickname}\r\n'.encode())
self.writer.write(f'USER {self.nickname} 0 * :{self.nickname}\r\n'.encode())
await self.writer.drain()
logging.info(f'Connected to {self.server}:{self.port} as {self.nickname}')
# Join the channel after connection is established
await self.join_channel()
async def join_channel(self):
self.writer.write(f'JOIN {self.channel}\r\n'.encode())
await self.writer.drain()
logging.info(f'Joined channel {self.channel}')
async def send_message(self, message, relay=True):
self.writer.write(f'PRIVMSG {self.channel} :{message}\r\n'.encode())
await self.writer.drain()
logging.info(f'Sent message: {message}')
if self.relay_bot and relay:
# Only add network identifier if message is not a relayed message
if not message.startswith('['):
message = f'[{self.network_identifier}] {message}'
await self.relay_bot.send_message(message, relay=False)
def parse_message(self, message):
parts = message.split()
if not parts:
return None, None, []
source = parts[0][1:] if parts[0].startswith(':') else None
command = parts[1] if source else parts[0]
args_start = 2 if source else 1
args = []
trailing_arg_start = None
for i, part in enumerate(parts[args_start:], args_start):
if part.startswith(':'):
trailing_arg_start = i
break
else:
args.append(part)
if trailing_arg_start is not None:
args.append(' '.join(parts[trailing_arg_start:])[1:])
return source, command, args
async def listen(self):
valid_colors = ['02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15']
which_color = lambda nick: valid_colors[sum(ord(c) for c in nick) % len(valid_colors)]
color_nick = lambda nick: f'\u0003{which_color(nick)}{nick}\u0003'
while True:
line = await self.reader.readline()
line = line.decode().strip()
logging.info(f'Received message: {line}')
source, command, args = self.parse_message(line)
if command == 'PING':
response = 'PONG :' + args[0] + '\r\n'
self.writer.write(response.encode())
await self.writer.drain()
logging.info(f'Sent PONG response')
elif command == 'PRIVMSG' and args[0] == self.channel:
nick = source.split('!')[0]
if nick not in self.ignored_users: # Check if the sender is ignored
colored_nick = color_nick(nick)
message = args[1]
if message.startswith('\x01ACTION'):
message = message[8:-1]
relay_message = f'[{self.network_identifier}] * {colored_nick} {message}'
else:
relay_message = f'[{self.network_identifier}] <{colored_nick}> {message}'
if self.relay_bot:
await self.relay_bot.send_message(relay_message)
elif command == 'INVITE' and args[0] == self.nickname:
self.channel = args[1]
await self.join_channel()
async def main():
# Load the configuration file
with open('config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Create the bots
bots = [IRCBot(**bot_config) for bot_config in config['bots']]
# Set up the relay bots
for i in range(len(bots)):
bots[i].relay_bot = bots[(i+1) % len(bots)]
# Connect and listen
await asyncio.gather(*(bot.connect() for bot in bots))
await asyncio.gather(*(bot.listen() for bot in bots))
if __name__ == '__main__':
asyncio.run(main())