#!/usr/bin/env python # -*- mode: python; coding: utf-8; -*- import argparse, errno, gettext, itertools, locale, os, re, readline, \ subprocess, sys, tempfile, textwrap, threading if sys.version_info[0] == 2: from Tkinter import * from tkMessageBox import * from Tix import * from ScrolledText import * from ttk import * from Queue import * def input_string (prompt=""): return raw_input (prompt) def gettext_install (*args, **kwargs): gettext.install (*args, unicode = True, **kwargs) elif sys.version_info[0] > 2: from tkinter import * from tkinter.messagebox import * from tkinter.tix import * from tkinter.scrolledtext import * from tkinter.ttk import * from queue import * def input_string (prompt=""): return input (prompt) gettext_install = gettext.install else: raise Exception ("Unsupported Python version {}".format (sys.version_info)) def dedented (text): return textwrap.dedent (text).strip () def ldedented (text): return textwrap.dedent (text).lstrip () def filled (text): return textwrap.fill (dedented (text), width = 72) def lfilled (text): return textwrap.fill (ldedented (text), width = 72) def read_input_string (prompt = "", default = ""): if default != "": readline.set_startup_hook (lambda: readline.insert_text (default)) try: return input_string(prompt) finally: readline.set_startup_hook() def parse_arguments (): parser = argparse.ArgumentParser () parser.add_argument ( "-v", "--version", dest = "version", action = "version", version = "crypto-install version GIT-TAG (GIT-COMMIT/GIT-BRANCH)", help = _ ("Display version.")) parser.add_argument ( "--no-gui", dest = "gui", action = "store_false", help = _ ("Disable GUI, use text interface.")) gnupg_group = parser.add_argument_group ( _ ("GnuPG"), _ ("Options related to the GnuPG setup.")) gnupg_group.add_argument ( "--no-gpg", dest = "gnupg", action = "store_false", help = _ ("Disable GnuPG setup.")) gnupg_group.add_argument ( "--gpg-home", dest = "gnupg_home", default = os.getenv("GNUPGHOME") or "~/.gnupg", metavar = _ ("PATH"), help = _ ("Default directory for GnuPG files.")) openssh_group = parser.add_argument_group ( _ ("OpenSSH"), _ ("Options related to the OpenSSH setup.")) openssh_group.add_argument ( "--no-ssh", dest = "openssh", action = "store_false", help = _ ("Disable OpenSSH setup.")) openssh_group.add_argument ( "--ssh-home", dest = "openssh_home", default = "~/.ssh", metavar = _ ("PATH"), help = _ ("Default directory for OpenSSH files.")) return parser.parse_args () def ensure_directories (path, mode = 0o777): try: os.makedirs (path, mode) except OSError as exception: if exception.errno != errno.EEXIST: raise def default_name (): return os.getenv ("FULLNAME") def default_email (): return os.getenv ("EMAIL") def default_comment (): return "" def default_hostname (): return subprocess.check_output ("hostname").strip () def default_username (): return os.getenv ("USER") def valid_email (value): return re.match (".+@.+", value) def valid_name (value): return value.strip () != "" def valid_user (value): return value.strip () != "" def valid_host (value): return value.strip () != "" def valid_comment (value): return True def gnupg_exists (arguments): gnupg_home = os.path.expanduser (arguments.gnupg_home) gnupg_secring = os.path.join (gnupg_home, "secring.gpg") return os.path.exists (gnupg_secring) def openssh_exists (arguments): openssh_home = os.path.expanduser (arguments.openssh_home) openssh_config = os.path.join (openssh_home, "config") openssh_key = os.path.join (openssh_home, "id_rsa") return os.path.exists (openssh_config) and os.path.exists (openssh_key) def quoted (string): return string.replace ("+", "++").replace (" ", "+") def input_passphrase (arguments): batch_passphrase = ldedented (""" RESET OPTION ttyname={} OPTION ttytype={} """).format (subprocess.check_output ("tty").strip (), os.getenv ("TERM")) expected_oks = 3 batch_env = dict (os.environ) if arguments.gui: batch_passphrase += ldedented (""" OPTION xauthority={} OPTION display={} """).format (os.getenv ("XAUTHORITY"), os.getenv ("DISPLAY")) expected_oks += 2 else: del batch_env["DISPLAY"] passphrase_process = subprocess.Popen (["gpg-agent", "--server"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, env = batch_env) try: line = passphrase_process.stdout.readline ().decode ("UTF-8") if line != "OK Pleased to meet you\n": raise Exception ("Couldn't read expected OK.") passphrase_process.stdin.write (batch_passphrase.encode ("UTF-8")) for i in range (expected_oks): line = passphrase_process.stdout.readline ().decode ("UTF-8") if line != "OK\n": raise Exception ("Couldn't read expected OK.") error, prompt, description = "", _ ("Passphrase:"), "" while True: batch_passphrase = \ "GET_PASSPHRASE --data --repeat=1 --qualitybar X {} {} {}\n" \ .format ((error and quoted (error)) or "X", (prompt and quoted (prompt)) or "X", (description and quoted (description)) or "X") passphrase_process.stdin.write (batch_passphrase.encode ("UTF-8")) line = passphrase_process.stdout.readline ().decode ("UTF-8") if line == "OK\n": error = _ ("Empty passphrase") continue if line.startswith ("D "): passphrase = line[2:-1] if len (passphrase) < 8: error = _ ("Passphrase too short") description = _ ("Passphrase has to have at least 8 characters.") continue return passphrase if line.startswith ("ERR 83886179"): raise Exception ("Operation cancelled.") raise Exception ("Unexpected response.") finally: passphrase_process.stdin.close () passphrase_process.stdout.close () passphrase_process.stderr.close () passphrase_process.wait () def redirect_to_stdout (process): # TODO: argh. there has to be a better way process.stdin.close () while process.poll () is None: sys.stdout.write (process.stdout.readline ()) while True: line = process.stdout.readline () if len (line) == 0: break sys.stdout.write (line.decode ("UTF-8")) def gnupg_setup (arguments, name = None, email = None, comment = None): gnupg_home = os.path.expanduser (arguments.gnupg_home) gnupg_secring = os.path.join (gnupg_home, "secring.gpg") if gnupg_exists (arguments): print (_ ("GnuPG secret keyring already exists at '{}'.") .format (gnupg_secring)) return if not arguments.gui: print (filled (_ (""" No default GnuPG key available. Please enter your information to create a new key."""))) name = read_input_string (_ ("What is your name (e.g. 'John Doe')? "), default_name ()) email = read_input_string (dedented (_ (""" What is your email address (e.g. 'test@example.com')? """)), default_email ()) comment = read_input_string (dedented (_ (""" What is your comment phrase, if any (e.g. 'key for 2014')? """)), default_comment ()) if not os.path.exists (gnupg_home): print (_ ("Creating GnuPG directory at '{}'.").format (gnupg_home)) ensure_directories (gnupg_home, 0o700) with tempfile.NamedTemporaryFile () as tmp: batch_key = ldedented (""" %ask-passphrase Key-Type: DSA Key-Length: 2048 Subkey-Type: ELG-E Subkey-Length: 2048 Name-Real: {} Name-Email: {} Expire-Date: 0 """).format (name, email) if comment != "": batch_key += "Name-Comment: {}\n".format (comment) tmp.write (batch_key.encode ("UTF-8")) tmp.flush () batch_env = dict (os.environ) if not arguments.gui: del batch_env["DISPLAY"] gnupg_process = subprocess.Popen (["gpg2", "--homedir", gnupg_home, "--batch", "--gen-key", tmp.name], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, env = batch_env) redirect_to_stdout (gnupg_process) if gnupg_process.returncode != 0: raise Exception ("Couldn't create GnuPG key.") def openssh_setup (arguments, comment = None): openssh_home = os.path.expanduser (arguments.openssh_home) openssh_config = os.path.join (openssh_home, "config") if not os.path.exists (openssh_config): print (_ ("Creating OpenSSH directory at '{}'.").format (openssh_home)) ensure_directories (openssh_home, 0o700) print (_ ("Creating OpenSSH configuration at '{}'.") .format (openssh_config)) with open (openssh_config, "w") as config: config.write (ldedented (""" ForwardAgent yes ForwardX11 yes """)) openssh_key = os.path.join (openssh_home, "id_rsa") if os.path.exists (openssh_key): print (_ ("OpenSSH key already exists at '{}'.").format (openssh_key)) return openssh_key_dsa = os.path.join (openssh_home, "id_dsa") if os.path.exists (openssh_key_dsa): print (_ ("OpenSSH key already exists at '{}'.").format (openssh_key_dsa)) return print (filled (_ ("No OpenSSH key available. Generating new key."))) if not arguments.gui: comment = "{}@{}".format (default_username (), default_hostname ()) comment = read_input_string (ldedented (_ (""" What is your comment phrase (e.g. 'user@mycomputer')? """)), comment) passphrase = input_passphrase (arguments) batch_env = dict (os.environ) if not arguments.gui: del batch_env["DISPLAY"] # TODO: is it somehow possible to pass the password on stdin? openssh_process = subprocess.Popen (["ssh-keygen", "-P", passphrase, "-C", comment, "-f", openssh_key], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, env = batch_env) redirect_to_stdout (openssh_process) if openssh_process.returncode != 0: raise Exception ("Couldn't create OpenSSH key.") def _state (value): return NORMAL if value else DISABLED # http://www.blog.pythonlibrary.org/2014/07/14/tkinter-redirecting-stdout-stderr/ # http://www.virtualroadside.com/blog/index.php/2012/11/10/glib-idle_add-for-tkinter-in-python/ class RedirectText (object): def __init__ (self, root, widget): self.root = root self.widget = widget self.queue = Queue () def write (self, string): self.widget.insert (END, string) def enqueue (self, value): self.queue.put (value) self.root.event_generate ("<>", when = "tail") class CryptoInstallProgress (Toplevel): def __init__ (self, parent): Toplevel.__init__ (self, parent) self.parent = parent self.create_widgets () def create_widgets (self): self.balloon = Balloon (self, initwait = 250) self.text = ScrolledText (self) self.text.pack (fill = BOTH, expand = True) self.redirect = RedirectText (self.parent, self.text) self._quit = Button (self, text = _ ("Quit"), command = self.maybe_quit) self.balloon.bind_widget (self._quit, msg = _ ("Quit the program immediately")) self._quit.pack () def update_widgets (self): if self.parent.state () == "normal": self._quit["text"] = _ ("Close") self.balloon.bind_widget (self._quit, msg = _ ("Close this window")) def maybe_quit (self): (self.quit if self.parent.state () != "normal" else self.destroy) () class CryptoInstall (Tk): def __init__ (self, arguments): Tk.__init__ (self) self.style = Style () self.style.theme_use ("clam") self.style.configure ("Invalid.TLabel", foreground = "red") self.style.configure ("Invalid.TEntry", foreground = "red") self.arguments = arguments self.resizable (width = False, height = False) self.title (_ ("Crypto Install Wizard")) self.progress = None self.create_widgets () def create_widgets (self): self.fields = {} self.balloon = Balloon (self, initwait = 250) self.info_frame = Frame (self) self.info_frame.pack (fill = X) msg = dedented (_ (""" Username on the local machine (e.g. 'user') """)) self.user_label = Label (self.info_frame, text = _ ("Username")) self.balloon.bind_widget (self.user_label, msg = msg) self.user_label.grid () self.user_var = StringVar (self, default_username ()) self.user_var.trace ("w", self.update_widgets) self.user = Entry (self.info_frame, textvariable = self.user_var) self.balloon.bind_widget (self.user, msg = msg) self.user.grid (row = 0, column = 1) self.fields["user"] = [self.user_var, valid_user, self.user, self.user_label] msg = dedented (_ (""" Host name of the local machine (e.g. 'mycomputer') """)) self.host_label = Label (self.info_frame, text = _ ("Host Name")) self.balloon.bind_widget (self.host_label, msg = msg) self.host_label.grid () self.host_var = StringVar (self, default_hostname ()) self.host_var.trace ("w", self.update_widgets) self.host = Entry (self.info_frame, textvariable = self.host_var) self.balloon.bind_widget (self.host, msg = msg) self.host.grid (row = 1, column = 1) self.fields["host"] = [self.host_var, valid_host, self.host, self.host_label] msg = dedented (_ (""" Full name as it should appear in the key description (e.g. 'John Doe') """)) self.name_label = Label (self.info_frame, text = _ ("Full Name")) self.balloon.bind_widget (self.name_label, msg = msg) self.name_label.grid () self.name_var = StringVar (self, default_name ()) self.name_var.trace ("w", self.update_widgets) self.name = Entry (self.info_frame, textvariable = self.name_var) self.balloon.bind_widget (self.name, msg = msg) self.name.grid (row = 2, column = 1) self.fields["name"] = [self.name_var, valid_name, self.name, self.name_label] msg = dedented (_ (""" Email address associated with the name (e.g. '') """)) self.email_label = Label (self.info_frame, text = _ ("Email address")) self.balloon.bind_widget (self.email_label, msg = msg) self.email_label.grid () self.email_var = StringVar (self, default_email ()) self.email_var.trace ("w", self.update_widgets) self.email = Entry (self.info_frame, textvariable = self.email_var) self.balloon.bind_widget (self.email, msg = msg) self.email.grid (row = 3, column = 1) self.fields["email"] = [self.email_var, valid_email, self.email, self.email_label] msg = dedented (_ (""" Comment phrase for the GnuPG key, if any (e.g. 'key for 2014') """)) self.comment_label = Label (self.info_frame, text = _ ("Comment phrase")) self.balloon.bind_widget (self.comment_label, msg = msg) self.comment_label.grid () self.comment_var = StringVar (self, default_comment ()) self.comment_var.trace ("w", self.update_widgets) self.comment = Entry (self.info_frame, textvariable = self.comment_var) self.balloon.bind_widget (self.comment, msg = msg) self.comment.grid (row = 4, column = 1) self.fields["comment"] = [self.comment_var, valid_comment, self.comment, self.comment_label] self.options_frame = Frame (self) self.options_frame.pack (fill = X) self.gnupg_label = Label (self.options_frame, text = _ ("Generate GnuPG key")) self.gnupg_label.grid () self.gnupg_var = IntVar (self, 1 if self.arguments.gnupg else 0) self.gnupg_var.trace ("w", self.update_widgets) self.gnupg = Checkbutton (self.options_frame, variable = self.gnupg_var) self.gnupg.grid (row = 0, column = 1) self.openssh_label = Label (self.options_frame, text = _ ("Generate OpenSSH key")) self.openssh_label.grid () self.openssh_var = IntVar (self, 1 if self.arguments.openssh else 0) self.openssh_var.trace ("w", self.update_widgets) self.openssh = Checkbutton (self.options_frame, variable = self.openssh_var) self.openssh.grid (row = 1, column = 1) self.button_frame = Frame (self) self.button_frame.pack (fill = X) self._generate = Button (self.button_frame, text = _ ("Generate Keys"), command = self.generate) self.balloon.bind_widget ( self._generate, msg = _ ("Generate the keys as configured above")) self._generate.pack (side = LEFT, fill = Y) self._quit = Button (self.button_frame, text = _ ("Quit"), command = self.quit) self.balloon.bind_widget (self._quit, msg = _ ("Quit the program immediately")) self._quit.pack (side = LEFT) self.update_widgets () def valid_state (self): if not self.openssh_var.get () and not self.gnupg_var.get (): return False if gnupg_exists (self.arguments) and openssh_exists (self.arguments): return False if not valid_name (self.user_var.get ()): return False if not valid_host (self.host_var.get ()): return False if not valid_email (self.email_var.get ()): return False if not valid_name (self.name_var.get ()): return False if not valid_comment (self.comment_var.get ()): return False return True def update_field (self, name): field = self.fields[name] valid = field[1] (field[0].get ()) field[2]["style"] = "" if valid else "Invalid.TEntry" field[3]["style"] = "" if valid else "Invalid.TLabel" def update_widgets (self, *args): self._generate["state"] = _state (self.valid_state ()) for field in ["user", "host", "name", "email", "comment"]: self.update_field (field) self.gnupg["state"] = _state (not gnupg_exists (self.arguments)) self.openssh["state"] = _state (not openssh_exists (self.arguments)) gnupg_key = self.name_var.get ().strip () comment = self.comment_var.get ().strip () if comment != "": gnupg_key + " ({}) ".format (comment) gnupg_key += "<{}>".format (self.email_var.get ().strip ()) user = self.user_var.get ().strip () host = self.host_var.get ().strip () openssh_key = "{}@{}".format (user, host) msg = dedented (_ (""" Generate a GnuPG key for '{}' and configure a default setup for it """)).format (gnupg_key) self.balloon.bind_widget (self.gnupg, msg = msg) self.balloon.bind_widget (self.gnupg_label, msg = msg) msg = dedented (_ (""" Generate an OpenSSH key for '{}' and configure a default setup for it """)).format (openssh_key) self.balloon.bind_widget (self.openssh, msg = msg) self.balloon.bind_widget (self.openssh_label, msg = msg) def generate_thread (self, gnupg, openssh, name, email, comment, user, host): stdout = sys.stdout try: sys.stdout = self.progress.redirect if gnupg: gnupg_setup (self.arguments, name, email, comment) # TODO: put update into queue self.update_widgets () if openssh: comment = "{}@{}".format (user, host) openssh_setup (self.arguments, comment) # TODO: put update into queue self.update_widgets () except Exception as exception: self.deiconify () sys.stdout.write (exception) sys.stdout.write ("\n") raise finally: # TODO: put update into queue self.progress.update_widgets () sys.stdout = stdout def _on_idle (): try: while True: message = self.progress.queue.get (block = False) self.progress.redirect.write (message) except Empty: pass def generate (self): self.withdraw () if not self.progress or self.progress.winfo_exists () == 0: self.progress = CryptoInstallProgress (self) self.progress.text.delete ("0.0", "end") self.bind ("<>", self._on_idle) thread = threading.Thread (target = self.generate_thread, args = (self.gnupg_var.get (), self.openssh_var.get (), self.name_var.get (), self.email_var.get (), self.comment_var.get (), self.user_var.get (), self.host_var.get ())) thread.start () def main (): locale.setlocale (locale.LC_ALL, "") gettext_install ("crypto-install", localedir = os.getenv ("TEXTDOMAINDIR")) arguments = parse_arguments () if arguments.gui: # TODO: use gtk instead? would be more consistent with the pinentry style # (assuming it's using gtk) CryptoInstall (arguments).mainloop () else: if arguments.gnupg: gnupg_setup (arguments) if arguments.openssh: openssh_setup (arguments) if __name__ == "__main__": main ()