# -*- mode: python; coding: utf-8; -*-
-import argparse, errno, gettext, itertools, os, re, readline, subprocess, \
- sys, tempfile, textwrap, threading
+import argparse, errno, gettext, itertools, locale, os, re, readline, \
+ subprocess, sys, tempfile, textwrap, threading
if sys.version_info[0] == 2:
from tkMessageBox import *
from Tix import *
from ScrolledText import *
+ from ttk import *
from Queue import *
return raw_input (prompt)
- def gettext_install ():
- gettext.install ("crypto-install", unicode = True)
+ def gettext_install (*args, **kwargs):
+ gettext.install (*args, unicode = True, **kwargs)
elif sys.version_info[0] > 2:
from tkinter import *
from tkinter.messagebox import *
from tkinter.tix import *
from tkinter.scrolledtext import *
+ from tkinter.ttk import *
from queue import *
return input (prompt)
- def gettext_install ():
- gettext.install ("crypto-install")
+ gettext_install = gettext.install
else:
raise Exception ("Unsupported Python version {}".format (sys.version_info))
"-v", "--version",
dest = "version",
action = "version",
- version = "crypto-install.py version GIT-TAG (GIT-COMMIT/GIT-BRANCH)",
+ version = "crypto-install version GIT-TAG (GIT-COMMIT/GIT-BRANCH)",
help = _ ("Display version."))
parser.add_argument (
"--no-gui",
"--no-gpg",
dest = "gnupg",
action = "store_false",
- help = "Disable GnuPG setup.")
+ help = _ ("Disable GnuPG setup."))
gnupg_group.add_argument (
"--gpg-home",
dest = "gnupg_home",
default = os.getenv("GNUPGHOME") or "~/.gnupg",
- metavar = "PATH",
+ metavar = _ ("PATH"),
help = _ ("Default directory for GnuPG files."))
openssh_group = parser.add_argument_group (
_ ("OpenSSH"),
"--ssh-home",
dest = "openssh_home",
default = "~/.ssh",
- metavar = "PATH",
+ metavar = _ ("PATH"),
help = _ ("Default directory for OpenSSH files."))
return parser.parse_args ()
return os.path.exists (openssh_config) and os.path.exists (openssh_key)
-# TODO: verify phrase at least once
-# TODO: use better labels
+def quoted (string):
+ return string.replace ("+", "++").replace (" ", "+")
+
+
def input_passphrase (arguments):
batch_passphrase = ldedented ("""
RESET
""").format (subprocess.check_output ("tty").strip (),
os.getenv ("TERM"))
+ expected_oks = 3
+
batch_env = dict (os.environ)
if arguments.gui:
batch_passphrase += ldedented ("""
OPTION display={}
""").format (os.getenv ("XAUTHORITY"),
os.getenv ("DISPLAY"))
+ expected_oks += 2
else:
del batch_env["DISPLAY"]
- batch_passphrase += \
- "GET_PASSPHRASE --data --check --qualitybar X X Passphrase X\n"
-
passphrase_process = subprocess.Popen (["gpg-agent", "--server"],
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
env = batch_env)
- (stdout, stderr) = passphrase_process.communicate (batch_passphrase.encode ("UTF-8"))
- if passphrase_process.returncode != 0:
- raise Exception ("Couldn't read passphrase.")
+ try:
+ line = passphrase_process.stdout.readline ().decode ("UTF-8")
+ if line != "OK Pleased to meet you\n":
+ raise Exception ("Couldn't read expected OK.")
- for line in stdout.splitlines ():
- if line.decode ("UTF-8").startswith ("D "):
- return line[2:]
+ passphrase_process.stdin.write (batch_passphrase.encode ("UTF-8"))
- return ""
+ for i in range (expected_oks):
+ line = passphrase_process.stdout.readline ().decode ("UTF-8")
+ if line != "OK\n":
+ raise Exception ("Couldn't read expected OK.")
+
+ error, prompt, description = "", _ ("Passphrase:"), ""
+
+ while True:
+ batch_passphrase = \
+ "GET_PASSPHRASE --data --repeat=1 --qualitybar X {} {} {}\n" \
+ .format ((error and quoted (error)) or "X",
+ (prompt and quoted (prompt)) or "X",
+ (description and quoted (description)) or "X")
+
+ passphrase_process.stdin.write (batch_passphrase.encode ("UTF-8"))
+
+ line = passphrase_process.stdout.readline ().decode ("UTF-8")
+
+ if line == "OK\n":
+ error = _ ("Empty passphrase")
+ continue
+
+ if line.startswith ("D "):
+ passphrase = line[2:-1]
+
+ if len (passphrase) < 8:
+ error = _ ("Passphrase too short")
+ description = _ ("Passphrase has to have at least 8 characters.")
+ continue
+
+ return passphrase
+
+ if line.startswith ("ERR 83886179"):
+ raise Exception ("Operation cancelled.")
+
+ raise Exception ("Unexpected response.")
+ finally:
+ passphrase_process.stdin.close ()
+ passphrase_process.stdout.close ()
+ passphrase_process.stderr.close ()
+
+ passphrase_process.wait ()
def redirect_to_stdout (process):
return NORMAL if value else DISABLED
-def _valid (value):
- return "black" if value else "red"
-
-
-def setitem (object, name, value):
- return object.__setitem__ (name, value)
-
-
-def setitems (name, value, objects):
- return map (lambda object: setitem (object, name, value), objects)
-
-
-def _fg (value, *objects):
- setitems ("fg", value, objects)
-
-
# http://www.blog.pythonlibrary.org/2014/07/14/tkinter-redirecting-stdout-stderr/
# http://www.virtualroadside.com/blog/index.php/2012/11/10/glib-idle_add-for-tkinter-in-python/
class RedirectText (object):
self._quit = Button (self,
text = _ ("Quit"),
- command = self.quit)
+ command = self.maybe_quit)
self.balloon.bind_widget (self._quit,
msg = _ ("Quit the program immediately"))
self._quit.pack ()
+ def update_widgets (self):
+ if self.parent.state () == "normal":
+ self._quit["text"] = _ ("Close")
+ self.balloon.bind_widget (self._quit,
+ msg = _ ("Close this window"))
+
+ def maybe_quit (self):
+ (self.quit if self.parent.state () != "normal" else self.destroy) ()
+
class CryptoInstall (Tk):
def __init__ (self, arguments):
Tk.__init__ (self)
+ self.style = Style ()
+ self.style.theme_use ("clam")
+ self.style.configure ("Invalid.TLabel", foreground = "red")
+ self.style.configure ("Invalid.TEntry", foreground = "red")
+
self.arguments = arguments
self.resizable (width = False, height = False)
self.title (_ ("Crypto Install Wizard"))
+ self.progress = None
+
self.create_widgets ()
def create_widgets (self):
self.info_frame = Frame (self)
self.info_frame.pack (fill = X)
- self.user_label = Label (self.info_frame, text = ("Username"))
+ msg = dedented (_ ("""
+ Username on the local machine (e.g. 'user')
+ """))
+ self.user_label = Label (self.info_frame, text = _ ("Username"))
+ self.balloon.bind_widget (self.user_label, msg = msg)
self.user_label.grid ()
self.user_var = StringVar (self, default_username ())
self.user_var.trace ("w", self.update_widgets)
self.user = Entry (self.info_frame, textvariable = self.user_var)
- self.balloon.bind_widget (self.user, msg = dedented (_ ("""
- Username on the local machine (e.g. 'user')
- """)))
+ self.balloon.bind_widget (self.user, msg = msg)
self.user.grid (row = 0, column = 1)
self.fields["user"] = [self.user_var, valid_user,
self.user, self.user_label]
+ msg = dedented (_ ("""
+ Host name of the local machine (e.g. 'mycomputer')
+ """))
self.host_label = Label (self.info_frame, text = _ ("Host Name"))
+ self.balloon.bind_widget (self.host_label, msg = msg)
self.host_label.grid ()
self.host_var = StringVar (self, default_hostname ())
self.host_var.trace ("w", self.update_widgets)
self.host = Entry (self.info_frame, textvariable = self.host_var)
- self.balloon.bind_widget (self.host, msg = dedented (_ ("""
- Host name of the local machine (e.g. 'mycomputer')
- """)))
+ self.balloon.bind_widget (self.host, msg = msg)
self.host.grid (row = 1, column = 1)
self.fields["host"] = [self.host_var, valid_host,
self.host, self.host_label]
+ msg = dedented (_ ("""
+ Full name as it should appear in the key description (e.g. 'John Doe')
+ """))
self.name_label = Label (self.info_frame, text = _ ("Full Name"))
+ self.balloon.bind_widget (self.name_label, msg = msg)
self.name_label.grid ()
self.name_var = StringVar (self, default_name ())
self.name_var.trace ("w", self.update_widgets)
self.name = Entry (self.info_frame, textvariable = self.name_var)
- self.balloon.bind_widget (self.name, msg = dedented (_ ("""
- Full name as it should appear in the key description (e.g. 'John Doe')
- """)))
+ self.balloon.bind_widget (self.name, msg = msg)
self.name.grid (row = 2, column = 1)
self.fields["name"] = [self.name_var, valid_name,
self.name, self.name_label]
+ msg = dedented (_ ("""
+ Email address associated with the name (e.g. '<test@example.com>')
+ """))
self.email_label = Label (self.info_frame, text = _ ("Email address"))
+ self.balloon.bind_widget (self.email_label, msg = msg)
self.email_label.grid ()
self.email_var = StringVar (self, default_email ())
self.email_var.trace ("w", self.update_widgets)
self.email = Entry (self.info_frame, textvariable = self.email_var)
- self.balloon.bind_widget (self.email, msg = dedented (_ ("""
- Email address associated with the name (e.g. '<test@example.com>')
- """)))
+ self.balloon.bind_widget (self.email, msg = msg)
self.email.grid (row = 3, column = 1)
self.fields["email"] = [self.email_var, valid_email,
self.email, self.email_label]
+ msg = dedented (_ ("""
+ Comment phrase for the GnuPG key, if any (e.g. 'key for 2014')
+ """))
self.comment_label = Label (self.info_frame, text = _ ("Comment phrase"))
+ self.balloon.bind_widget (self.comment_label, msg = msg)
self.comment_label.grid ()
self.comment_var = StringVar (self, default_comment ())
self.comment_var.trace ("w", self.update_widgets)
self.comment = Entry (self.info_frame, textvariable = self.comment_var)
- self.balloon.bind_widget (self.comment, msg = dedented (_ ("""
- Comment phrase for the GnuPG key, if any (e.g. 'key for 2014')
- """)))
+ self.balloon.bind_widget (self.comment, msg = msg)
self.comment.grid (row = 4, column = 1)
self.fields["comment"] = [self.comment_var, valid_comment,
def update_field (self, name):
field = self.fields[name]
- _fg (_valid (field[1] (field[0].get ())), field[2], field[3])
+ valid = field[1] (field[0].get ())
+
+ field[2]["style"] = "" if valid else "Invalid.TEntry"
+ field[3]["style"] = "" if valid else "Invalid.TLabel"
def update_widgets (self, *args):
self._generate["state"] = _state (self.valid_state ())
self.balloon.bind_widget (self.openssh, msg = msg)
self.balloon.bind_widget (self.openssh_label, msg = msg)
- def generate_thread (self):
+ def generate_thread (self, gnupg, openssh, name, email, comment, user,
+ host):
stdout = sys.stdout
try:
sys.stdout = self.progress.redirect
- # TODO: capture and show stdout and stderr
- if self.gnupg_var.get ():
- # TODO: make get calls thread-safe
- gnupg_setup (self.arguments,
- self.name_var.get (),
- self.email_var.get (),
- self.comment_var.get ())
+ if gnupg:
+ gnupg_setup (self.arguments, name, email, comment)
# TODO: put update into queue
self.update_widgets ()
- if self.openssh_var.get ():
- comment = "{}@{}".format (self.user_var.get (),
- self.host_var.get ())
+ if openssh:
+ comment = "{}@{}".format (user, host)
openssh_setup (self.arguments, comment)
# TODO: put update into queue
self.update_widgets ()
+ except Exception as exception:
+ self.deiconify ()
+
+ sys.stdout.write (exception)
+ sys.stdout.write ("\n")
+
+ raise
finally:
+ # TODO: put update into queue
+ self.progress.update_widgets ()
sys.stdout = stdout
def _on_idle ():
try:
while True:
- self.progress.redirect.write (self.progress.queue.get (block = False))
+ message = self.progress.queue.get (block = False)
+ self.progress.redirect.write (message)
except Empty:
pass
def generate (self):
- self.progress = CryptoInstallProgress (self)
+ self.withdraw ()
+
+ if not self.progress or self.progress.winfo_exists () == 0:
+ self.progress = CryptoInstallProgress (self)
+ self.progress.text.delete ("0.0", "end")
self.bind ("<<Idle>>", self._on_idle)
- thread = threading.Thread (target = self.generate_thread)
+ thread = threading.Thread (target = self.generate_thread,
+ args = (self.gnupg_var.get (),
+ self.openssh_var.get (),
+ self.name_var.get (),
+ self.email_var.get (),
+ self.comment_var.get (),
+ self.user_var.get (),
+ self.host_var.get ()))
thread.start ()
def main ():
- gettext_install ()
+ locale.setlocale (locale.LC_ALL, "")
+
+ gettext_install ("crypto-install", localedir = os.getenv ("TEXTDOMAINDIR"))
arguments = parse_arguments ()