from autoLoader.config import SFTP_HOSTNAME, SFTP_USERNAME, SFTP_PASSWORD import socket def check_connection(): """Проверка доступности сервера""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) try: result = sock.connect_ex((SFTP_HOSTNAME, 22)) if result == 0: print("Порт 22 доступен") else: print(f"Порт 22 недоступен. Код ошибки: {result}") except Exception as e: print(f"Ошибка проверки соединения: {e}") finally: sock.close() # Перед подключением вызовите проверку # check_connection() import paramiko class ConnectorSFTP(): def __init__(self): self.sftp = None self.ssh = paramiko.SSHClient() def connect(self, remote_path: str): try: self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect( hostname=SFTP_HOSTNAME, username=SFTP_USERNAME, password=SFTP_PASSWORD, ) self.sftp = self.ssh.open_sftp() remote_path = remote_path.lstrip('/') # Удаляем начальный слэш, если есть self.sftp.chdir(remote_path) # Переходим в директори except paramiko.AuthenticationException: print("Ошибка аутентификации. Проверьте имя пользователя и пароль.") return None, None except paramiko.SSHException as e: print(f"Ошибка SSH: {e}") return None, None except FileNotFoundError: print(f"Директория {remote_path} не найдена на сервере.") return None, None except IOError as e: print(f"Ошибка ввода-вывода: {e}") return None, None