mirror of
https://gitlab.com/vylion/velascobot.git
synced 2025-04-19 21:46:35 +02:00
Markov -> Generator ✔️ Chatlog -> Metadata ✔️ Scribe -> Reader ✔️ Speaker updated ✔️ Also: - Updated to python-telegram-bot v. 12 callbacks - Renamed a lot of variables and rewritten a lot of code to follow Python conventions - Added a new argument for filter chat IDs - Changed obtention of username from hardcoded to a library-provided function - Added a new argument for bot nicknames that it can respond to
193 lines
6 KiB
Python
193 lines
6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import random
|
|
from metadata import Metadata, parse_card_line
|
|
from generator import Generator
|
|
|
|
|
|
def get_chat_title(chat):
|
|
# This gives me the chat title, or the first and maybe last
|
|
# name of the user as fallback if it's a private chat
|
|
if chat.title is not None:
|
|
return chat.title
|
|
elif chat.first_name is not None:
|
|
if chat.last_name is not None:
|
|
return chat.first_name + " " + chat.last_name
|
|
else:
|
|
return chat.first_name
|
|
else:
|
|
return ""
|
|
|
|
|
|
class Memory(object):
|
|
def __init__(self, mid, content):
|
|
self.id = mid
|
|
self.content = content
|
|
|
|
|
|
class Reader(object):
|
|
# This is a chat Reader object, in charge of managing the parsing of messages
|
|
# for a specific chat, and holding said chat's metadata
|
|
|
|
TAG_PREFIX = "^IS_"
|
|
STICKER_TAG = "^IS_STICKER^"
|
|
ANIM_TAG = "^IS_ANIMATION^"
|
|
VIDEO_TAG = "^IS_VIDEO^"
|
|
|
|
def __init__(self, metadata, vocab, max_period, logger):
|
|
self.meta = metadata
|
|
self.vocab = vocab
|
|
self.max_period = max_period
|
|
self.short_term_mem = []
|
|
self.countdown = self.meta.period
|
|
self.logger = logger
|
|
|
|
def FromChat(chat, max_period, logger):
|
|
# Create a new Reader from a Chat object
|
|
meta = Metadata(chat.id, chat.type, get_chat_title(chat))
|
|
vocab = Generator()
|
|
return Reader(meta, vocab, max_period, logger)
|
|
|
|
def FromHistory(history, vocab, max_period, logger):
|
|
# Create a new Reader from a whole Chat history (WIP)
|
|
return None
|
|
|
|
def FromCard(meta, vocab, max_period, logger):
|
|
# Create a new Reader from a meta's file dump
|
|
metadata = Metadata.loads(meta)
|
|
return Reader(metadata, vocab, max_period, logger)
|
|
|
|
def FromFile(text, max_period, logger, vocab=None):
|
|
# Load a Reader from a file's text string (obsolete)
|
|
lines = text.splitlines()
|
|
version = parse_card_line(lines[0]).strip()
|
|
version = version if len(version.strip()) > 1 else lines[4]
|
|
logger.info("Dictionary version: {} ({} lines)".format(version, len(lines)))
|
|
if version == "v4" or version == "v5":
|
|
return Reader.FromCard(text, vocab, max_period, logger)
|
|
# I stopped saving the chat metadata and the cache together
|
|
elif version == "v3":
|
|
meta = Metadata.loadl(lines[0:8])
|
|
cache = '\n'.join(lines[9:])
|
|
vocab = Generator.loads(cache)
|
|
elif version == "v2":
|
|
meta = Metadata.loadl(lines[0:7])
|
|
cache = '\n'.join(lines[8:])
|
|
vocab = Generator.loads(cache)
|
|
elif version == "dict:":
|
|
meta = Metadata.loadl(lines[0:6])
|
|
cache = '\n'.join(lines[6:])
|
|
vocab = Generator.loads(cache)
|
|
else:
|
|
meta = Metadata.loadl(lines[0:4])
|
|
cache = lines[4:]
|
|
vocab = Generator(load=cache, mode=Generator.MODE_LIST)
|
|
# raise SyntaxError("Reader: Metadata format unrecognized.")
|
|
r = Reader(meta, vocab, max_period, logger)
|
|
return r
|
|
|
|
def archive(self):
|
|
# Returns a nice lice little tuple package for the archivist to save to file.
|
|
# Also commits to long term memory any pending short term memories
|
|
self.commit_memory()
|
|
return (self.meta.id, self.meta.dumps(), self.vocab.dumps())
|
|
|
|
def check_type(self, t):
|
|
# Checks type. Returns "True" for "group" even if it's supergroup
|
|
return t in self.meta.type
|
|
|
|
def exactly_type(self, t):
|
|
# Hard check
|
|
return t == self.meta.type
|
|
|
|
def set_title(self, title):
|
|
self.meta.title = title
|
|
|
|
def set_period(self, period):
|
|
if period < self.countdown:
|
|
self.countdown = max(period, 1)
|
|
return self.meta.set_period(min(period, self.max_period))
|
|
|
|
def set_answer(self, prob):
|
|
return self.meta.set_answer(prob)
|
|
|
|
def cid(self):
|
|
return str(self.meta.id)
|
|
|
|
def count(self):
|
|
return self.meta.count
|
|
|
|
def period(self):
|
|
return self.meta.period
|
|
|
|
def title(self):
|
|
return self.meta.title
|
|
|
|
def answer(self):
|
|
return self.meta.answer
|
|
|
|
def ctype(self):
|
|
return self.meta.type
|
|
|
|
def is_restricted(self):
|
|
return self.meta.restricted
|
|
|
|
def toggle_restrict(self):
|
|
self.meta.restricted = (not self.meta.restricted)
|
|
|
|
def is_silenced(self):
|
|
return self.meta.silenced
|
|
|
|
def toggle_silence(self):
|
|
self.meta.silenced = (not self.meta.silenced)
|
|
|
|
def is_answering(self):
|
|
rand = random.random()
|
|
chance = self.answer()
|
|
if chance == 1:
|
|
return True
|
|
elif chance == 0:
|
|
return False
|
|
return rand <= chance
|
|
|
|
def add_memory(self, mid, content):
|
|
mem = Memory(mid, content)
|
|
self.short_term_mem.append(mem)
|
|
|
|
def random_memory(self):
|
|
if len(self.short_term_mem) == 0:
|
|
return None
|
|
mem = random.choice(self.short_term_mem)
|
|
return mem.id
|
|
|
|
def reset_countdown(self):
|
|
self.countdown = self.meta.period
|
|
|
|
def read(self, message):
|
|
mid = str(message.message_id)
|
|
|
|
if message.text is not None:
|
|
self.learn(mid, message.text)
|
|
elif message.sticker is not None:
|
|
self.learn_drawing(mid, Reader.STICKER_TAG, message.sticker.file_id)
|
|
elif message.animation is not None:
|
|
self.learn_drawing(mid, Reader.ANIM_TAG, message.animation.file_id)
|
|
elif message.video is not None:
|
|
self.learn_drawing(mid, Reader.VIDEO_TAG, message.video.file_id)
|
|
self.meta.count += 1
|
|
|
|
def learn_drawing(self, mid, tag, drawing):
|
|
self.learn(mid, tag + " " + drawing)
|
|
|
|
def learn(self, mid, text):
|
|
if "velasco" in text.casefold() and len(text.split()) <= 3:
|
|
return
|
|
self.add_memory(mid, text)
|
|
|
|
def commit_memory(self):
|
|
for mem in self.short_term_mem:
|
|
self.vocab.add(mem.content)
|
|
self.short_term_mem = []
|
|
|
|
def generate_message(self, max_len):
|
|
return self.vocab.generate(size=max_len, silence=self.is_silenced())
|