import socket import threading import rsa import os from pathlib import Path import time from datetime import datetime import urwid import sys import locale class ChatClient: def __init__(self, host='127.0.0.1', port=5555): locale.setlocale(locale.LC_ALL, '') os.environ['LANG'] = 'ru_RU.UTF-8' self.host = host self.port = port self.nickname = None self.client_public_key = None self.client_private_key = None self.server_public_key = None self.current_chat = [] self.file_transfers = {} self.current_file = None self.input_history = [] self.history_index = -1 self.last_update = time.time() self.update_interval = 5 self.palette = [ ('header', 'white', 'dark blue'), ('chat', 'light gray', 'black'), ('input', 'white', 'black'), ('status', 'white', 'dark blue'), ('nick', 'light cyan', 'black'), ('server', 'light green', 'black'), ('file', 'yellow', 'black'), ('error', 'light red', 'black'), ('own', 'light magenta', 'black'), ('highlight', 'black', 'light gray'), ] self.generate_keys() self.setup_client() self.setup_ui() def generate_keys(self): (self.client_public_key, self.client_private_key) = rsa.newkeys(2048) def setup_client(self): self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.client.connect((self.host, self.port)) except Exception as e: print(f"Не удалось подключиться к серверу: {e}") sys.exit(1) def setup_ui(self): # Создаем виджеты чата self.chat_content = urwid.SimpleListWalker([]) self.chat_list = urwid.ListBox(self.chat_content) chat_frame = urwid.Frame( urwid.AttrMap(self.chat_list, 'chat'), header=urwid.AttrMap(urwid.Text(" ЧАТ "), 'header')) self.input_edit = urwid.Edit(caption="Сообщение: ") input_pile = urwid.Pile([ urwid.AttrMap(self.input_edit, 'input'), urwid.Divider('-') ]) # Статус бар self.status_text = urwid.Text("") status_bar = urwid.AttrMap(self.status_text, 'status') # Главный layout self.main_frame = urwid.Frame( body=chat_frame, footer=urwid.Pile([ input_pile, status_bar ]) ) # Главный цикл self.loop = urwid.MainLoop( self.main_frame, palette=self.palette, unhandled_input=self.handle_input ) def handle_input(self, key): if key == 'enter': self.process_command(self.input_edit.get_edit_text()) self.input_edit.set_edit_text("") self.history_index = -1 elif key == 'up': self.navigate_history(-1) elif key == 'down': self.navigate_history(1) elif key == 'f1': self.show_help() elif key == 'ctrl u': self.input_edit.set_edit_text("") def navigate_history(self, direction): if not self.input_history: return if direction == -1 and abs(self.history_index) < len(self.input_history): self.history_index -= 1 elif direction == 1 and self.history_index < -1: self.history_index += 1 if -len(self.input_history) <= self.history_index <= -1: self.input_edit.set_edit_text(self.input_history[self.history_index]) def process_command(self, msg): if not msg.strip(): return self.input_history.append(msg) if len(self.input_history) > 100: self.input_history.pop(0) if msg.lower() == '/exit': self.client.close() raise urwid.ExitMainLoop() elif msg.lower() == '/help': self.show_help() elif msg.lower() in ('y', 'n') and hasattr(self, 'current_file') and self.current_file.get('awaiting_response'): response = '/accept_file' if msg.lower() == 'y' else '/reject_file' encrypted_response = rsa.encrypt(response.encode('utf-8'), self.server_public_key) self.client.send(encrypted_response) if msg.lower() == 'y': self.add_message(f"[ФАЙЛ] Ожидайте получения файла {self.current_file['file_name']}...", 'file') else: self.add_message(f"[ФАЙЛ] Вы отказались от получения файла {self.current_file['file_name']}", 'file') del self.current_file self.set_status("") elif msg.startswith('/file'): self.send_file(msg) else: self.add_message(f"{self.nickname}: {msg}", 'own') try: encrypted_msg = rsa.encrypt(msg.encode('utf-8'), self.server_public_key) self.client.send(encrypted_msg) except Exception as e: self.add_message(f"[ОШИБКА] Не удалось отправить сообщение: {str(e)}", 'error') def send_file(self, command): try: parts = command.split(' ', 2) if len(parts) < 3: self.add_message("[ОШИБКА] Использование: /file <получатель> <путь_к_файлу>", 'error') self.set_status("Используйте: /file <ник> <путь_к_файлу>") return recipient = parts[1] file_path = parts[2] if not os.path.exists(file_path): self.add_message(f"[ОШИБКА] Файл {file_path} не найден", 'error') self.set_status(f"Файл не найден: {file_path}") return file_size = os.path.getsize(file_path) if file_size > 1024 * 1024: # 1MB лимит self.add_message("[ОШИБКА] Файл слишком большой (макс. 1MB)", 'error') self.set_status("Файл слишком большой (макс. 1MB)") return with open(file_path, 'rb') as f: file_data = f.read() file_name = os.path.basename(file_path) full_command = f"/file {recipient} {file_name} {file_data.decode('latin1')}" encrypted_msg = rsa.encrypt(full_command.encode('latin1'), self.server_public_key) self.client.send(encrypted_msg) self.add_message(f"[ФАЙЛ] Запрос на отправку {file_name} пользователю {recipient} отправлен", 'file') self.set_status(f"Запрос на отправку файла {file_name} отправлен") except Exception as e: self.add_message(f"[ОШИБКА при отправке файла] {str(e)}", 'error') self.set_status("Ошибка при отправке файла") def receive(self): try: self.server_public_key = rsa.PublicKey.load_pkcs1(self.client.recv(2048)) self.client.send(self.client_public_key.save_pkcs1()) self.set_status("Введите ваш никнейм и нажмите Enter") while not self.nickname: time.sleep(0.1) encrypted_nickname = rsa.encrypt(self.nickname.encode('utf-8'), self.server_public_key) self.client.send(encrypted_nickname) self.set_status(f"Подключено как {self.nickname}") while True: try: encrypted_msg = self.client.recv(4096) if not encrypted_msg: break msg = rsa.decrypt(encrypted_msg, self.client_private_key).decode('utf-8') if msg.startswith('/file_notification'): self.handle_file_notification(msg) elif msg.startswith('/file_data'): self.handle_file_data(msg) else: self.add_message(msg, 'server' if msg.startswith('[СЕРВЕР]') else 'nick') except Exception as e: self.add_message(f"[ОШИБКА] {str(e)}", 'error') break except Exception as e: self.add_message(f"[ОШИБКА] {str(e)}", 'error') finally: self.client.close() raise urwid.ExitMainLoop() def handle_file_notification(self, notification): parts = notification.split(' ', 2) if len(parts) < 3: return sender = parts[1] file_name = parts[2] self.add_message(f"[ФАЙЛ] {sender} хочет отправить вам файл: {file_name}", 'file') self.set_status(f"Принять файл {file_name} от {sender}? (y/n)") self.current_file = { 'sender': sender, 'file_name': file_name, 'awaiting_response': True } def handle_file_data(self, file_data): if not hasattr(self, 'current_file'): return try: downloads_dir = Path("downloads") downloads_dir.mkdir(exist_ok=True) file_path = downloads_dir / self.current_file['file_name'] with open(file_path, 'wb') as f: f.write(file_data[len('/file_data '):].encode('latin1')) self.add_message(f"[ФАЙЛ] Файл {self.current_file['file_name']} сохранен в {file_path}", 'file') self.set_status(f"Файл получен: {self.current_file['file_name']}") except Exception as e: self.add_message(f"[ОШИБКА] Не удалось сохранить файл: {str(e)}", 'error') finally: if hasattr(self, 'current_file'): del self.current_file def add_message(self, text, style=None): if style: self.chat_content.append(urwid.AttrMap(urwid.Text(text), style)) else: self.chat_content.append(urwid.Text(text)) if len(self.chat_content) > 0: self.chat_list.set_focus(len(self.chat_content) - 1) def set_status(self, message, timeout=3): self.status_text.set_text(f" {message} ") if timeout > 0: threading.Timer(timeout, lambda: self.set_status("")).start() def update_ui(self, loop=None, user_data=None): current_time = time.time() if current_time - self.last_update >= self.update_interval: self.last_update = current_time self.loop.draw_screen() self.loop.set_alarm_in(self.update_interval, self.update_ui) def show_help(self): help_text = [ "СПРАВКА ПО МЕССЕНДЖЕРУ", "", "Основные команды:", "/exit - Выйти из программы", "/help - Показать эту справку", "", "Отправка файлов:", "/file <ник> <путь> - Отправить файл указанному пользователю", "Пример: /file user2 /home/user/image.jpg", "", "При получении файла:", "1. Вы получите уведомление о входящем файле", "2. Введите 'y' для принятия или 'n' для отказа", "", "Горячие клавиши:", "F1 - Показать/скрыть справку", "Стрелки вверх/вниз - История сообщений", "Ctrl+U - Очистить поле ввода" ] help_box = urwid.ListBox(urwid.SimpleListWalker([ urwid.AttrMap(urwid.Text(line), 'highlight') for line in help_text ])) overlay = urwid.Overlay( urwid.LineBox(help_box, title="Помощь"), self.main_frame, align='center', width=('relative', 80), valign='middle', height=('relative', 80) ) def exit_help(key): if key in ('f1', 'esc', 'enter'): self.loop.widget = self.main_frame self.loop.widget = overlay self.loop.unhandled_input = exit_help def run(self): self.set_status("Введите Ваш никнейм и нажмите Enter") def set_nickname(edit, new_edit_text): if new_edit_text.strip(): self.nickname = new_edit_text.strip() self.input_edit.set_edit_text("") self.loop.unhandled_input = self.handle_input self.set_status(f"Подключено как {self.nickname}") self.input_edit.set_caption("Сообщение: ") self.loop.set_alarm_in(self.update_interval, self.update_ui) self.input_edit.set_caption("Никнейм: ") self.loop.unhandled_input = lambda key: ( key == 'enter' and set_nickname(None, self.input_edit.get_edit_text()) ) receive_thread = threading.Thread(target=self.receive, daemon=True) receive_thread.start() self.loop.run() if __name__ == "__main__": try: if len(sys.argv) > 1: host = sys.argv[1] port = int(sys.argv[2]) if len(sys.argv) > 2 else 5555 client = ChatClient(host, port) else: client = ChatClient() client.run() except KeyboardInterrupt: if hasattr(client, 'client'): client.client.close() sys.exit(0) except Exception as e: print(f"Ошибка: {str(e)}") sys.exit(1)