#!/usr/bin/env python3 # Copyright (C) 2018 Simone Margaritelli # 2018 MiWCryptAnalytics # 2023 munix9 # 2023 Wojtek Widomski # 2019-2023 Gustavo IƱiguez Goia # # This file is part of OpenSnitch. # # OpenSnitch is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # OpenSnitch is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with OpenSnitch. If not, see . from PyQt5 import QtWidgets, QtCore from PyQt5.QtNetwork import QLocalServer, QLocalSocket import sys import os import signal import argparse import logging from concurrent import futures import grpc dist_path = '/usr/lib/python3/dist-packages/' if dist_path not in sys.path: sys.path.append(dist_path) from opensnitch.service import UIService from opensnitch.config import Config from opensnitch.utils import Themes, Utils, Versions, Message from opensnitch.utils.xdg import xdg_opensnitch_dir, xdg_current_session import opensnitch.ui_pb2 from opensnitch.ui_pb2_grpc import add_UIServicer_to_server from opensnitch import auth app_id = os.path.join(xdg_opensnitch_dir, "io.github.evilsocket.opensnitch") def on_exit(): server.stop(0) app.quit() try: os.remove(app_id) except: pass sys.exit(0) def restrict_socket_perms(socket): """Restrict socket reading to the current user""" try: if socket.startswith("unix://") and os.path.exists(socket[7:]): os.chmod(socket[7:], 0o640) except Exception as e: print("Unable to change unix socket permissions:", socket, e) def check_environ(): if xdg_current_session == "": print(""" Warning: XDG_SESSION_TYPE is not set. If there're no icons on the GUI, please, read the following comment: https://github.com/evilsocket/opensnitch/discussions/999#discussioncomment-6579273 """) def supported_qt_version(major, medium, minor): q = QtCore.QT_VERSION_STR.split(".") return int(q[0]) >= major and int(q[1]) >= medium and int(q[2]) >= minor if __name__ == '__main__': gui_version, grpcversion, protoversion = Versions.get() print("\t ~ OpenSnitch GUI -", gui_version, "~") print("\tprotobuf:", protoversion, "-", "grpc:", grpcversion) print("-" * 50, "\n") parser = argparse.ArgumentParser(description='OpenSnitch UI service.', formatter_class=argparse.RawTextHelpFormatter) parser.add_argument("--socket", dest="socket", help=''' Path of the unix socket for the gRPC service (https://github.com/grpc/grpc/blob/master/doc/naming.md). Default: unix:///tmp/osui.sock Examples: - Listening on Unix socket: opensnitch-ui --socket unix:///tmp/osui.sock * Use unix:///run/1000/YOUR_USER/opensnitch/osui.sock for better privacy. - Listening on port 50051, all interfaces: opensnitch-ui --socket "[::]:50051" ''', metavar="FILE") parser.add_argument("--socket-auth", dest="socket_auth", help="Auth type: simple, tls-simple, tls-mutual") parser.add_argument("--tls-ca-cert", dest="tls_ca_cert", help="path to the CA cert") parser.add_argument("--tls-cert", dest="tls_cert", help="path to the server cert") parser.add_argument("--tls-key", dest="tls_key", help="path to the server key") parser.add_argument("--max-clients", dest="serverWorkers", default=10, help="Max number of allowed clients (incoming connections).") parser.add_argument("--debug", dest="debug", action="store_true", help="Enable debug logs") parser.add_argument("--debug-grpc", dest="debug_grpc", action="store_true", help="Enable gRPC debug logs") parser.add_argument("--background", dest="background", action="store_true", help="Start UI in background even, when tray is not available") args = parser.parse_args() if args.debug: import faulthandler faulthandler.enable() logging.getLogger().disabled = not args.debug if args.debug and args.debug_grpc: os.environ["GRPC_TRACE"] = "all" os.environ["GRPC_VERBOSITY"] = "debug" os.environ["QT_AUTO_SCREEN_SCALE_FACTOR"] = "1" if supported_qt_version(5,6,0): try: # NOTE: maybe we also need Qt::AA_UseHighDpiPixmaps QtCore.QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling, True) except Exception: pass try: Utils.create_socket_dirs() app = QtWidgets.QApplication(sys.argv) localsocket = QLocalSocket() localsocket.connectToServer(app_id) if localsocket.waitForConnected(): raise Exception("GUI already running, opening its window and exiting.") else: localserver = QLocalServer() localserver.setSocketOptions(QLocalServer.UserAccessOption) localserver.removeServer(app_id) localserver.listen(app_id) if hasattr(QtCore.Qt, 'AA_UseHighDpiPixmaps'): app.setAttribute(QtCore.Qt.AA_UseHighDpiPixmaps, True) thm = Themes.instance() thm.load_theme(app) cfg = Config.get() if args.socket == None: # default args.socket = "unix:///tmp/osui.sock" addr = cfg.getSettings(Config.DEFAULT_SERVER_ADDR) if addr != None and addr != "": if addr.startswith("unix://"): if not os.path.exists(os.path.dirname(addr[7:])): print("WARNING: unix socket path does not exist, using unix:///tmp/osui.sock, ", addr) else: args.socket = addr else: args.socket = addr maxmsglen = cfg.getMaxMsgLength() service = UIService(app, on_exit, start_in_bg=args.background) check_environ() localserver.newConnection.connect(service.OpenWindow) # @doc: https://grpc.github.io/grpc/python/grpc.html#server-object server = grpc.server(futures.ThreadPoolExecutor(), options=( # https://github.com/grpc/grpc/blob/master/doc/keepalive.md # https://grpc.github.io/grpc/core/group__grpc__arg__keys.html # send keepalive ping every 5 second, default is 2 hours) ('grpc.keepalive_time_ms', 5000), # after 5s of inactivity, wait 20s and close the connection if # there's no response. ('grpc.keepalive_timeout_ms', 20000), ('grpc.keepalive_permit_without_calls', True), ('grpc.max_send_message_length', maxmsglen), ('grpc.max_receive_message_length', maxmsglen), )) add_UIServicer_to_server(service, server) auth_type = auth.Simple if args.socket_auth != None: auth_type = args.socket_auth elif cfg.getSettings(Config.AUTH_TYPE) != None: auth_type = cfg.getSettings(Config.AUTH_TYPE) # grpc python doesn't seem to accept unix:@address to listen on an # abstract unix socket, so use unix-abstract: and transform it to what # the Go client understands. if args.socket.startswith("unix:@"): parts = args.socket.split("@") args.socket = "unix-abstract:{0}".format(parts[1]) print("Using server address:", args.socket, "auth type:", auth_type) if auth_type == auth.Simple or auth_type == "": server.add_insecure_port(args.socket) else: auth_ca_cert = args.tls_ca_cert auth_cert = args.tls_cert auth_certkey = args.tls_key if auth_cert == None: auth_cert = cfg.getSettings(Config.AUTH_CERT) if auth_certkey == None: auth_certkey = cfg.getSettings(Config.AUTH_CERTKEY) if auth_ca_cert == None: auth_ca_cert = cfg.getSettings(Config.AUTH_CA_CERT) tls_creds = auth.get_tls_credentials(auth_ca_cert, auth_cert, auth_certkey) if tls_creds == None: raise Exception("Invalid TLS credentials. Review the server key and cert files.") server.add_secure_port(args.socket, tls_creds) # https://stackoverflow.com/questions/5160577/ctrl-c-doesnt-work-with-pyqt signal.signal(signal.SIGINT, signal.SIG_DFL) # print "OpenSnitch UI service running on %s ..." % socket server.start() restrict_socket_perms(args.socket) app.exec_() except KeyboardInterrupt: on_exit() except Exception as e: print(e)