CSRSA/CSRSA_client.py
2025-03-28 18:32:31 +03:00

396 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import threading
import rsa
import os
from pathlib import Path
import time
from datetime import datetime
import urwid
import sys
import locale
class ChatClient:
def __init__(self, host='127.0.0.1', port=5555):
# Настройка локализации
locale.setlocale(locale.LC_ALL, '')
os.environ['LANG'] = 'ru_RU.UTF-8'
self.host = host
self.port = port
self.nickname = None
self.client_public_key = None
self.client_private_key = None
self.server_public_key = None
self.current_chat = []
self.file_transfers = {}
self.current_file = None
self.input_history = []
self.history_index = -1
self.last_update = time.time()
self.update_interval = 5 # Интервал обновления в секундах
# Цветовая схема
self.palette = [
('header', 'white', 'dark blue'),
('chat', 'light gray', 'black'),
('input', 'white', 'black'),
('status', 'white', 'dark blue'),
('nick', 'light cyan', 'black'),
('server', 'light green', 'black'),
('file', 'yellow', 'black'),
('error', 'light red', 'black'),
('own', 'light magenta', 'black'),
('highlight', 'black', 'light gray'),
]
self.generate_keys()
self.setup_client()
self.setup_ui()
def generate_keys(self):
(self.client_public_key, self.client_private_key) = rsa.newkeys(2048)
def setup_client(self):
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.client.connect((self.host, self.port))
except Exception as e:
print(f"Не удалось подключиться к серверу: {e}")
sys.exit(1)
def setup_ui(self):
# Создаем виджеты чата
self.chat_content = urwid.SimpleListWalker([])
self.chat_list = urwid.ListBox(self.chat_content)
# Исправленная строка - правильно закрываем скобки Frame
chat_frame = urwid.Frame(
urwid.AttrMap(self.chat_list, 'chat'),
header=urwid.AttrMap(urwid.Text(" ЧАТ "), 'header'))
# Поле ввода
self.input_edit = urwid.Edit(caption="Сообщение: ")
input_pile = urwid.Pile([
urwid.AttrMap(self.input_edit, 'input'),
urwid.Divider('-')
])
# Статус бар
self.status_text = urwid.Text("")
status_bar = urwid.AttrMap(self.status_text, 'status')
# Главный layout
self.main_frame = urwid.Frame(
body=chat_frame,
footer=urwid.Pile([
input_pile,
status_bar
])
)
# Главный цикл
self.loop = urwid.MainLoop(
self.main_frame,
palette=self.palette,
unhandled_input=self.handle_input
)
def handle_input(self, key):
if key == 'enter':
self.process_command(self.input_edit.get_edit_text())
self.input_edit.set_edit_text("")
self.history_index = -1
elif key == 'up':
self.navigate_history(-1)
elif key == 'down':
self.navigate_history(1)
elif key == 'f1':
self.show_help()
elif key == 'ctrl u':
self.input_edit.set_edit_text("")
def navigate_history(self, direction):
if not self.input_history:
return
if direction == -1 and abs(self.history_index) < len(self.input_history):
self.history_index -= 1
elif direction == 1 and self.history_index < -1:
self.history_index += 1
if -len(self.input_history) <= self.history_index <= -1:
self.input_edit.set_edit_text(self.input_history[self.history_index])
def process_command(self, msg):
if not msg.strip():
return
# Добавляем в историю
self.input_history.append(msg)
if len(self.input_history) > 100:
self.input_history.pop(0)
if msg.lower() == '/exit':
self.client.close()
raise urwid.ExitMainLoop()
elif msg.lower() == '/help':
self.show_help()
elif msg.lower() in ('y', 'n') and hasattr(self, 'current_file') and self.current_file.get('awaiting_response'):
response = '/accept_file' if msg.lower() == 'y' else '/reject_file'
encrypted_response = rsa.encrypt(response.encode('utf-8'), self.server_public_key)
self.client.send(encrypted_response)
if msg.lower() == 'y':
self.add_message(f"[ФАЙЛ] Ожидайте получения файла {self.current_file['file_name']}...", 'file')
else:
self.add_message(f"[ФАЙЛ] Вы отказались от получения файла {self.current_file['file_name']}", 'file')
del self.current_file
self.set_status("")
elif msg.startswith('/file'):
self.send_file(msg)
else:
# Отображаем свое сообщение
self.add_message(f"{self.nickname}: {msg}", 'own')
# Отправляем на сервер
try:
encrypted_msg = rsa.encrypt(msg.encode('utf-8'), self.server_public_key)
self.client.send(encrypted_msg)
except Exception as e:
self.add_message(f"[ОШИБКА] Не удалось отправить сообщение: {str(e)}", 'error')
def send_file(self, command):
try:
parts = command.split(' ', 2)
if len(parts) < 3:
self.add_message("[ОШИБКА] Использование: /file <получатель> <путь_к_файлу>", 'error')
self.set_status("Используйте: /file <ник> <путь_к_файлу>")
return
recipient = parts[1]
file_path = parts[2]
if not os.path.exists(file_path):
self.add_message(f"[ОШИБКА] Файл {file_path} не найден", 'error')
self.set_status(f"Файл не найден: {file_path}")
return
file_size = os.path.getsize(file_path)
if file_size > 1024 * 1024: # 1MB лимит
self.add_message("[ОШИБКА] Файл слишком большой (макс. 1MB)", 'error')
self.set_status("Файл слишком большой (макс. 1MB)")
return
with open(file_path, 'rb') as f:
file_data = f.read()
file_name = os.path.basename(file_path)
full_command = f"/file {recipient} {file_name} {file_data.decode('latin1')}"
encrypted_msg = rsa.encrypt(full_command.encode('latin1'), self.server_public_key)
self.client.send(encrypted_msg)
self.add_message(f"[ФАЙЛ] Запрос на отправку {file_name} пользователю {recipient} отправлен", 'file')
self.set_status(f"Запрос на отправку файла {file_name} отправлен")
except Exception as e:
self.add_message(f"[ОШИБКА при отправке файла] {str(e)}", 'error')
self.set_status("Ошибка при отправке файла")
def receive(self):
try:
# Получаем публичный ключ сервера
self.server_public_key = rsa.PublicKey.load_pkcs1(self.client.recv(2048))
# Отправляем свой публичный ключ
self.client.send(self.client_public_key.save_pkcs1())
# Получаем никнейм
self.set_status("Введите ваш никнейм и нажмите Enter")
while not self.nickname:
time.sleep(0.1)
# Отправляем никнейм на сервер
encrypted_nickname = rsa.encrypt(self.nickname.encode('utf-8'), self.server_public_key)
self.client.send(encrypted_nickname)
self.set_status(f"Подключено как {self.nickname}")
while True:
try:
encrypted_msg = self.client.recv(4096)
if not encrypted_msg:
break
# Расшифровываем сообщение
msg = rsa.decrypt(encrypted_msg, self.client_private_key).decode('utf-8')
if msg.startswith('/file_notification'):
self.handle_file_notification(msg)
elif msg.startswith('/file_data'):
self.handle_file_data(msg)
else:
self.add_message(msg, 'server' if msg.startswith('[СЕРВЕР]') else 'nick')
except Exception as e:
self.add_message(f"[ОШИБКА] {str(e)}", 'error')
break
except Exception as e:
self.add_message(f"[ОШИБКА] {str(e)}", 'error')
finally:
self.client.close()
raise urwid.ExitMainLoop()
def handle_file_notification(self, notification):
parts = notification.split(' ', 2)
if len(parts) < 3:
return
sender = parts[1]
file_name = parts[2]
self.add_message(f"[ФАЙЛ] {sender} хочет отправить вам файл: {file_name}", 'file')
self.set_status(f"Принять файл {file_name} от {sender}? (y/n)")
self.current_file = {
'sender': sender,
'file_name': file_name,
'awaiting_response': True
}
def handle_file_data(self, file_data):
if not hasattr(self, 'current_file'):
return
try:
downloads_dir = Path("downloads")
downloads_dir.mkdir(exist_ok=True)
file_path = downloads_dir / self.current_file['file_name']
with open(file_path, 'wb') as f:
f.write(file_data[len('/file_data '):].encode('latin1'))
self.add_message(f"[ФАЙЛ] Файл {self.current_file['file_name']} сохранен в {file_path}", 'file')
self.set_status(f"Файл получен: {self.current_file['file_name']}")
except Exception as e:
self.add_message(f"[ОШИБКА] Не удалось сохранить файл: {str(e)}", 'error')
finally:
if hasattr(self, 'current_file'):
del self.current_file
def add_message(self, text, style=None):
if style:
self.chat_content.append(urwid.AttrMap(urwid.Text(text), style))
else:
self.chat_content.append(urwid.Text(text))
# Автопрокрутка вниз
if len(self.chat_content) > 0:
self.chat_list.set_focus(len(self.chat_content) - 1)
def set_status(self, message, timeout=3):
self.status_text.set_text(f" {message} ")
if timeout > 0:
threading.Timer(timeout, lambda: self.set_status("")).start()
def update_ui(self, loop=None, user_data=None):
"""Функция для обновления интерфейса"""
current_time = time.time()
if current_time - self.last_update >= self.update_interval:
self.last_update = current_time
# Здесь можно добавить любые обновления интерфейса
# Например, обновление времени в статус баре
self.loop.draw_screen()
# Планируем следующее обновление
self.loop.set_alarm_in(self.update_interval, self.update_ui)
def show_help(self):
help_text = [
"СПРАВКА ПО МЕССЕНДЖЕРУ",
"",
"Основные команды:",
"/exit - Выйти из программы",
"/help - Показать эту справку",
"",
"Отправка файлов:",
"/file <ник> <путь> - Отправить файл указанному пользователю",
"Пример: /file user2 /home/user/image.jpg",
"",
"При получении файла:",
"1. Вы получите уведомление о входящем файле",
"2. Введите 'y' для принятия или 'n' для отказа",
"",
"Горячие клавиши:",
"F1 - Показать/скрыть справку",
"Стрелки вверх/вниз - История сообщений",
"Ctrl+U - Очистить поле ввода"
]
help_box = urwid.ListBox(urwid.SimpleListWalker([
urwid.AttrMap(urwid.Text(line), 'highlight') for line in help_text
]))
overlay = urwid.Overlay(
urwid.LineBox(help_box, title="Помощь"),
self.main_frame,
align='center',
width=('relative', 80),
valign='middle',
height=('relative', 80)
)
def exit_help(key):
if key in ('f1', 'esc', 'enter'):
self.loop.widget = self.main_frame
self.loop.widget = overlay
self.loop.unhandled_input = exit_help
def run(self):
# Получаем никнейм
self.set_status("Введите ваш никнейм и нажмите Enter")
def set_nickname(edit, new_edit_text):
if new_edit_text.strip():
self.nickname = new_edit_text.strip()
self.input_edit.set_edit_text("")
self.loop.unhandled_input = self.handle_input
self.set_status(f"Подключено как {self.nickname}")
self.input_edit.set_caption("Сообщение: ")
# Запускаем периодическое обновление интерфейса
self.loop.set_alarm_in(self.update_interval, self.update_ui)
self.input_edit.set_caption("Никнейм: ")
self.loop.unhandled_input = lambda key: (
key == 'enter' and set_nickname(None, self.input_edit.get_edit_text())
)
# Запускаем поток для получения сообщений
receive_thread = threading.Thread(target=self.receive, daemon=True)
receive_thread.start()
# Запускаем главный цикл
self.loop.run()
if __name__ == "__main__":
try:
if len(sys.argv) > 1:
host = sys.argv[1]
port = int(sys.argv[2]) if len(sys.argv) > 2 else 5555
client = ChatClient(host, port)
else:
client = ChatClient()
client.run()
except KeyboardInterrupt:
if hasattr(client, 'client'):
client.client.close()
sys.exit(0)
except Exception as e:
print(f"Ошибка: {str(e)}")
sys.exit(1)