diff --git a/README.md b/README.md index 53caebc..fc77859 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ Some scripts are specific to the [Nexus DR-X](http://wb7fhc.com/nexus-dr-x.html) [VNC Server Activity Reporting script](#vnc-server-activity-script) +[USB Device Manager](#usb-device-manager-script) + ## Installation @@ -388,3 +390,33 @@ You can execute this script automatically via cron. The following example will 3 0 * * * /usr/local/bin/vnc-server-activity.sh 2>&1 >/dev/null +## USB Device Manager script + +`usb_control.py` allows you to "virtually" plug/unplug *most* USB devices remotely by using the `bind` and `unbind` feature in Linux. This can be handy when you need to remotely re-mount a USB drive or remove/insert a USB-serial or other USB adapter. + +The script can be run in 2 ways: From the command line or via a GUI. If no arguments are supplied, the script attempts to start in GUI mode. + +In GUI mode, the script will list the USB devices it finds. It will not list USB hubs, but it will list devices connected to hubs. Clicking on a device in the list toggles that device's state. The states are __Enabled__ (bound) or __Disabled__ (unbound). It will detect when devices are physically inserted or removed and automatically update the device list. + +If you run `usb_control.py` from the command line with the `-b` or `-u` options, the script will search for a device containing the string you supply. It will search the USB ID and the Tag (product description) for your string. If found, it'll enable (bind) if you supplied `-b` or disable (unbind) if you supplied `-u`. If you run it with the `-l` option, it will list the non-hub USB devices it finds. + +Run `usb_control.py -h` to see the +command line options: + + usage: usb_control.py [-h] [-v] [-l] [-b STRING] [-u STRING] + + USB Device Control + + optional arguments: + -h, --help show this help message and exit + -v, --version show program's version number and exit + -l, --list List available USB devices + -b STRING, --bind STRING + bind (enable) a usb device containing STRING (case- + insensitive) in 'lsusb' output ID or Tag fields + -u STRING, --unbind STRING + unbind (disable) a usb device containing STRING (case- + insensitive) in 'lsusb' output ID or Tag fields + + + diff --git a/usb_control.py b/usb_control.py new file mode 100755 index 0000000..b8ad345 --- /dev/null +++ b/usb_control.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +import sys +import signal +import os +import subprocess +import tkinter as tk +from tkinter import ttk +import tkinter.font as tkfont +import collections + +__author__ = "Steve Magnuson AG7GN" +__copyright__ = "Copyright 2020, Steve Magnuson" +__credits__ = ["Steve Magnuson"] +__license__ = "GPL" +__version__ = "1.0.1" +__maintainer__ = "Steve Magnuson" +__email__ = "ag7gn@arrl.net" +__status__ = "Production" + + +class UsbWindow(object): + """ + GUI that allows user to click on a USB device to toggle it's state + between 'bound' (enabled) or 'unbound' (disabled). + """ + treeview_font = (None, 12,) + treeview_header_font = (None, 12, 'bold') + label_font = (None, 11, 'bold') + button_font = (None, 12, 'bold') + max_label_width = 350 + min_window_width = max_label_width + 35 + max_window_height = 380 + + def __init__(self, master): + self.master = master + master.title(f"USB Device Manager - version {__version__}") + ws = master.winfo_screenwidth() + hs = master.winfo_screenheight() + pos_x = (ws // 2) - (self.min_window_width // 2) + pos_y = (hs // 2) - (self.max_window_height // 2) + master.geometry(f"+{pos_x}+{pos_y}") + # master.config(bg="skyblue") + # Make the label frame & label + self.header = ["ID", "Tag", "Device", "State"] + self.label_frame = tk.Frame(master=master, borderwidth=5) + self.label_frame.pack(side='top', fill='both', padx=5, pady=5, expand=True) + s = """Click on a USB device to toggle enable/disable state. +(Enabled = bound, Disabled = unbound)""" + msg = tk.Label(master=self.label_frame, + wraplength=self.max_label_width, + justify="center", + font=self.label_font, fg='blue', + anchor="center", text=s) + msg.pack(side='top', padx=5, pady=5) + self.list_frame = tk.Frame(master=master, borderwidth=5) + self.list_frame.pack(side='top', fill='both', padx=5, pady=5, + expand=True) + style = ttk.Style() + # Set table header font. 'None' means use the default font. + style.configure("Treeview.Heading", font=self.treeview_header_font) + # Set table contents font. + style.configure("Treeview", font=self.treeview_font) + self.tree = ttk.Treeview(master=self.list_frame, + columns=self.header, + show="headings", style="Treeview") + self.tree.bind('', self._select_item) + # NOTE: tags are broken in tkinter 8.6.9! + self.tree.tag_configure('odd', background='#E8E8E8') + self.tree.tag_configure('even', background='#DFDFDF') + vsb = tk.Scrollbar(master=self.list_frame, + orient="vertical", + command=self.tree.yview) + # hsb = tk.Scrollbar(master=self.list_frame, + # orient="horizontal", + # command=self.tree.xview) + # self.tree.configure(yscrollcommand=vsb.set, + # xscrollcommand=hsb.set) + self.tree.configure(yscrollcommand=vsb.set) + self.tree.grid(column=0, row=0, + sticky='nsew', in_=self.list_frame) + vsb.grid(column=1, row=0, sticky='ns', in_=self.list_frame) + # hsb.grid(column=0, row=1, sticky='ew', in_=self.list_frame) + self.list_frame.grid_columnconfigure(0, weight=1) + self.list_frame.grid_rowconfigure(0, weight=1) + + self.button_frame = tk.Frame(master=master, borderwidth=5) + self.button_frame.pack(side='top', fill='both', padx=5, pady=5, + expand=True) + # self.refresh_button = tk.Button(master=self.button_frame, + # text='Refresh List', + # command=lambda: self._build_tree()) + # self.refresh_button.pack(padx=5, pady=5, side='left', + # fill='both', expand=True) + self.quit_button = tk.Button(master=self.button_frame, + text='Quit', + font=self.button_font, + command=lambda: + self.master.quit()) + self.quit_button.pack(side='left', + fill='both', expand=True) + self.current_list = None + self._update_tree() + + def _build_headers(self): + """ + Constructs the Treeview table headers and auto-adjusts the + width of each header using the number of characters in the + column heading. + + :return: None + """ + for col in self.header: + self.tree.heading(col, text=col.title(), + command=lambda c=col: self._sort_by(c, 0)) + # Adjust the column's width to the header string + _width = tkfont.Font(font=self.treeview_header_font).measure(col.title() + '__') + self.tree.column(col, width=_width) + + def _build_tree(self): + """ + Calls get_usb_devices and inserts results into the tree. Column + width auto-adjusts based on the character count in the row + field with the greatest number of characters. + + :return: None + """ + self.tree.delete(*self.tree.get_children()) + i = 0 + tree_width = 0 + for item in self.current_list: + if i % 2 == 0: + self.tree.insert('', 'end', values=item, tags=('even',)) + else: + self.tree.insert('', 'end', values=item, tags=('odd',)) + i += 1 + # adjust column's width if necessary to fit each value + # Use the column's header string as a minimum width + tree_width = 0 + for ix, val in enumerate(item): + min_col_w = self.tree.column(self.header[ix], width=None) + col_w = tkfont.Font(font=self.treeview_font).measure(val + '__') + if min_col_w < col_w: + self.tree.column(self.header[ix], width=col_w) + tree_width += col_w + else: + tree_width += min_col_w + # Update root window width to accommodate the new tree width + self.master.update_idletasks() + self.master.update() + x = self.master.winfo_width() + y = self.master.winfo_height() + window_width = max([tree_width + 35, self.min_window_width]) + if x != window_width: + self.master.geometry(f"{window_width}x{y}") + + def _select_item(self, _): + selected = self.tree.focus() + if not selected: + return + _device = self.tree.item(selected)['values'][2] + _state = self.tree.item(selected)['values'][3].casefold() + if _state == "enabled": + target_state = "unbind" + else: + target_state = "bind" + set_usb_device_state(_device, target_state) + self._build_tree() + + def _update_tree(self): + """ + Checks to see if the size of the list of USB devices has changed + and if it has, refresh the tree header and list of devices. + + :return: None + """ + _latest_list = get_usb_devices() + if collections.Counter(_latest_list) != collections.Counter(self.current_list): + self.current_list = _latest_list + self._build_headers() + self._build_tree() + self.list_frame.after(1000, self._update_tree) + + def _sort_by(self, col, descending): + """ + Sorts tree contents when a column header is clicked on. + + :param col: The column to sort on + :param descending: True if descending, False if ascending sort + desired + :return: None + """ + data = [(self.tree.set(child, col), child) for child in self.tree.get_children('')] + # Sort the data in place + # if the data to be sorted is numeric change to float + # data = change_numeric(data) + data.sort(reverse=descending) + for ix, item in enumerate(data): + self.tree.move(item[1], '', ix) + # Switch the sort direction + self.tree.heading(col, + command=lambda c=col: + self._sort_by(col, int(not descending))) + + +def get_usb_devices() -> list: + """Returns list of USB devices that are eligible for binding + and unbinding. Devices with 'hub' in the description are excluded + from the list. The returned list consists of tuples, with each + tuple containing the USB device ID, Tag (Product description), Device + number, and state. State is "Enabled" if device is bound, "Disabled" + if unbound. + + :return: List of tuples of USB devices. If no devices were found, + returns empty list. + """ + import re + device_re = re.compile("Bus\\s+(?P\\d+)\\s+" + "Device\\s+(?P\\d+).+" + "ID\\s(?P\\w+:\\w+)\\s(?P.+)$", re.I) + df = subprocess.check_output("lsusb").decode('utf-8') + devices = [] + for i in df.split('\n'): + if i: + info = device_re.match(i) + if info: + dinfo = info.groupdict() + if " hub" not in dinfo['tag'].casefold(): + _bus = dinfo.pop('bus') + _device = dinfo.pop('device') + cmd = f"grep -l {_bus}/{_device} /sys/bus/usb/devices/*/uevent 2>/dev/null | tail -1" + product = subprocess.check_output(cmd, shell=True).decode('utf-8') + if product: + _p = product.split('/')[5] + if os.path.islink(f"/sys/bus/usb/drivers/usb/{_p}"): + status = "Enabled" + else: + status = 'Disabled' + devices.append((dinfo['id'], dinfo['tag'], + _p, status)) + return devices + + +def set_usb_device_state(_device: str, _action: str) -> bool: + """ + Binds (enables) or unbinds (disables) a USB device + + :param _device: Device designation + :param _action: bind|unbind Desired setting for device + :return True if _action was successful, False otherwise + """ + cmd = f"echo {_device} | sudo tee /sys/bus/usb/drivers/usb/{_action} 2>/dev/null" + try: + _result = subprocess.check_output(cmd, shell=True).decode('utf-8') + except subprocess.CalledProcessError as e: + print(f"ERROR: {e}. Do you have sudo permissions to run " + f"'sudo tee /sys/bus/usb/drivers/usb/{_action}'?", + file=sys.stderr) + return False + else: + return True + + +def find_usb_device(_device_string: str, _action: str) -> bool: + """ + Searches the ID and tag for device_string, and if a match is + found, calls set_usb_device_state to bind/unbind that USB device. + + :param _device_string: String to search for in USB device ID or Tag + (Product string) + :param _action: bind|unbind Desired setting for device + :return True if device was found AND _action was successful, False + otherwise + """ + devices = get_usb_devices() + if devices: + for d in devices: + if _device_string in d[0] or \ + _device_string.casefold() in d[1].casefold(): + if (_action == "bind" and d[3] == "Enabled") or \ + (_action == "unbind" and d[3] == "Disabled"): + print(f"ERROR: {_action} requested but device is already {d[3]}", + file=sys.stderr) + return False + _answer = set_usb_device_state(d[2], _action) + if _answer: + return True + else: + return False + return False + + +def sigint_handler(sig, frame): + print(f"Signal handler caught {sig} {frame}") + root.quit() + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(prog='usb_control.py', + description=f"USB Device Control") + parser.add_argument('-v', '--version', action='version', + version=f"Version: {__version__}") + parser.add_argument("-l", "--list", action='store_true', + help="List available USB devices") + parser.add_argument("-b", "--bind", + type=str, metavar="STRING", + help="bind (enable) a usb device containing " + "STRING (case-insensitive) in 'lsusb' " + "output ID or Tag fields") + parser.add_argument("-u", "--unbind", + type=str, metavar="STRING", + help="unbind (disable) a usb device containing " + "STRING (case-insensitive) in 'lsusb' " + "output ID or Tag fields") + arg_info = parser.parse_args() + if not sys.platform.startswith('linux'): + print(f"This application only works on Linux", file=sys.stderr) + sys.exit(1) + if arg_info.list: + dev_list = get_usb_devices() + if dev_list: + from tabulate import tabulate + print(tabulate(dev_list, headers=["ID", "Tag", "Device", "State"])) + sys.exit(0) + else: + print("No USB devices found", file=sys.stderr) + sys.exit(0) + if arg_info.bind: + answer = find_usb_device(arg_info.bind, "bind") + if answer: + sys.exit(0) + else: + sys.exit(1) + if arg_info.unbind: + answer = find_usb_device(arg_info.unbind, "unbind") + if answer: + sys.exit(0) + else: + sys.exit(1) + + # If we made it this far, no arguments were supplied. Attempt + # to open GUI. + if os.environ.get('DISPLAY', '') == '': + print(f"ERROR: No $DISPLAY environment. " + f"Must supply argument to run without X", file=sys.stderr) + sys.exit(1) + # os.environ.__setitem__('DISPLAY', ':0.0') + + root = tk.Tk() + root.resizable(width=True, height=True) + signal.signal(signal.SIGINT, sigint_handler) + # Stop program if Esc key pressed + root.bind('', lambda _: root.quit()) + # Stop program if window is closed at OS level ('X' in upper right + # corner or red dot in upper left on Mac) + root.protocol("WM_DELETE_WINDOW", lambda: root.quit()) + UsbWindow(root) + root.mainloop() + sys.exit(0)