diff --git a/src/escpos/escpos.py b/src/escpos/escpos.py index 07fd887..f66a13b 100644 --- a/src/escpos/escpos.py +++ b/src/escpos/escpos.py @@ -14,7 +14,7 @@ This module contains the abstract base class :py:class:`Escpos`. import textwrap from abc import ABCMeta, abstractmethod # abstract base class support from re import match as re_match -from typing import List, Optional, Union +from typing import List, Literal, Optional, Union import barcode import qrcode @@ -114,7 +114,11 @@ class Escpos(object): class. """ - device = None + # device status: + # False -> Not initialized + # None -> Initialized but not connected + # object -> The connection object (Usb(), Serial(), Network(), etc.) + _device: Union[Literal[False], Literal[None], object] = False def __init__(self, profile=None, magic_encode_args=None, **kwargs) -> None: """Initialize ESCPOS Printer. @@ -128,6 +132,27 @@ class Escpos(object): """Call self.close upon deletion.""" self.close() + @property + def device(self) -> Union[Literal[None], object]: + """Implements a self-open mechanism. + + An attempt to get the property before open the connection + will cause the connection to open. + """ + if self._device is False: + # Open device if not previously opened + self._device = None # None -> Initialized + self.open() + return self._device + + @device.setter + def device(self, new_device: Union[Literal[False], Literal[None], object]): + self._device = new_device + + def open(self): + """Open a printer device/connection.""" + pass + @abstractmethod def _raw(self, msg: bytes) -> None: """Send raw data to the printer. diff --git a/src/escpos/exceptions.py b/src/escpos/exceptions.py index 5cbe352..a583467 100644 --- a/src/escpos/exceptions.py +++ b/src/escpos/exceptions.py @@ -13,7 +13,8 @@ Result/Exit codes: - `60` = Invalid pin to send Cash Drawer pulse :py:exc:`~escpos.exceptions.CashDrawerError` - `70` = Invalid number of tab positions :py:exc:`~escpos.exceptions.TabPosError` - `80` = Invalid char code :py:exc:`~escpos.exceptions.CharCodeError` - - `90` = USB device not found :py:exc:`~escpos.exceptions.USBNotFoundError` + - `90` = Device not found :py:exc:`~escpos.exceptions.DeviceNotFoundError` + - `91` = USB device not found :py:exc:`~escpos.exceptions.USBNotFoundError` - `100` = Set variable out of range :py:exc:`~escpos.exceptions.SetVariableError` - `200` = Configuration not found :py:exc:`~escpos.exceptions.ConfigNotFoundError` - `210` = Configuration syntax error :py:exc:`~escpos.exceptions.ConfigSyntaxError` @@ -275,11 +276,35 @@ class CharCodeError(Error): return "Valid char code must be set ({msg})".format(msg=self.msg) -class USBNotFoundError(Error): - """Device was not found (probably not plugged in). +class DeviceNotFoundError(Error): + """Device was not found. + + The device seems to be not accessible. + The return code for this exception is `90`. + + inheritance: + + .. inheritance-diagram:: escpos.exceptions.Error + :parts: 1 + + """ + + def __init__(self, msg=""): + """Initialize DeviceNotFoundError object.""" + Error.__init__(self, msg) + self.msg = msg + self.resultcode = 90 + + def __str__(self): + """Return string representation of DeviceNotFoundError.""" + return f"Device not found ({self.msg})" + + +class USBNotFoundError(DeviceNotFoundError): + """USB device was not found (probably not plugged in). The USB device seems to be not plugged in. - The return code for this exception is `90`. + The return code for this exception is `91`. inheritance: @@ -292,11 +317,11 @@ class USBNotFoundError(Error): """Initialize USBNotFoundError object.""" Error.__init__(self, msg) self.msg = msg - self.resultcode = 90 + self.resultcode = 91 def __str__(self): """Return string representation of USBNotFoundError.""" - return "USB device not found ({msg})".format(msg=self.msg) + return f"USB device not found ({self.msg})" class SetVariableError(Error): diff --git a/src/escpos/printer/cups.py b/src/escpos/printer/cups.py index 60e8468..0625b01 100644 --- a/src/escpos/printer/cups.py +++ b/src/escpos/printer/cups.py @@ -9,9 +9,12 @@ """ import functools +import logging import tempfile +from typing import Literal, Optional, Type, Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError #: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`) _DEP_PYCUPS = False @@ -20,6 +23,9 @@ try: import cups _DEP_PYCUPS = True + # Store server defaults before further configuration + DEFAULT_HOST = cups.getServer() + DEFAULT_PORT = cups.getPort() except ImportError: pass @@ -78,48 +84,84 @@ class CupsPrinter(Escpos): return is_usable() @dependency_pycups - def __init__(self, printer_name=None, *args, **kwargs): + def __init__(self, printer_name: str = "", *args, **kwargs): """Class constructor for CupsPrinter. :param printer_name: CUPS printer name (Optional) - :type printer_name: str :param host: CUPS server host/ip (Optional) :type host: str :param port: CUPS server port (Optional) :type port: int """ Escpos.__init__(self, *args, **kwargs) - host, port = args or ( - kwargs.get("host", cups.getServer()), - kwargs.get("port", cups.getPort()), + self.host, self.port = args or ( + kwargs.get("host", DEFAULT_HOST), + kwargs.get("port", DEFAULT_PORT), ) - cups.setServer(host) - cups.setPort(port) - self.conn = cups.Connection() - self.tmpfile = None + self.tmpfile = tempfile.NamedTemporaryFile(delete=True) self.printer_name = printer_name self.job_name = "" self.pending_job = False - self.open() + + self._device: Union[ + Literal[False], Literal[None], Type[cups.Connection] + ] = False @property - def printers(self): + def printers(self) -> dict: """Available CUPS printers.""" - return self.conn.getPrinters() + if self.device: + return self.device.getPrinters() + return {} - def open(self, job_name="python-escpos"): + def open( + self, job_name: str = "python-escpos", raise_not_found: bool = True + ) -> None: """Set up a new print job and target the printer. A call to this method is required to send new jobs to - the same CUPS connection. + the CUPS connection after close. Defaults to default CUPS printer. Creates a new temporary file buffer. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` """ + if self._device: + self.close() + + cups.setServer(self.host) + cups.setPort(self.port) self.job_name = job_name - if self.printer_name not in self.printers: - self.printer_name = self.conn.getDefault() - self.tmpfile = tempfile.NamedTemporaryFile(delete=True) + if self.tmpfile.closed: + self.tmpfile = tempfile.NamedTemporaryFile(delete=True) + + try: + # Open device + self.device: Optional[Type[cups.Connection]] = cups.Connection() + if self.device: + # Name validation, set default if no given name + self.printer_name = self.printer_name or self.device.getDefault() + assert self.printer_name in self.printers, "Incorrect printer name" + except (RuntimeError, AssertionError) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to start a print job for the printer {self.printer_name}:" + + f"\n{e}" + ) + else: + logging.error( + "CupsPrinter printing %s not available", self.printer_name + ) + return + logging.info("CupsPrinter printer enabled") def _raw(self, msg): """Append any command sent in raw format to temporary file. @@ -134,14 +176,13 @@ class CupsPrinter(Escpos): self.pending_job = False raise ValueError("Printer job not opened") - @dependency_pycups def send(self): """Send the print job to the printer.""" if self.pending_job: # Rewind tempfile self.tmpfile.seek(0) # Print temporary file via CUPS printer. - self.conn.printFile( + self.device.printFile( self.printer_name, self.tmpfile.name, self.job_name, @@ -173,8 +214,9 @@ class CupsPrinter(Escpos): Send pending job to the printer if needed. """ + if not self._device: + return if self.pending_job: self.send() - if self.conn: - print("Closing CUPS connection to printer {}".format(self.printer_name)) - self.conn = None + logging.info("Closing CUPS connection to printer %s", self.printer_name) + self._device = False diff --git a/src/escpos/printer/file.py b/src/escpos/printer/file.py index acf4802..67cbe4f 100644 --- a/src/escpos/printer/file.py +++ b/src/escpos/printer/file.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -"""This module contains the implementation of the CupsPrinter printer driver. +"""This module contains the implementation of the File printer driver. :author: python-escpos developers :organization: `python-escpos `_ @@ -8,7 +8,11 @@ :license: MIT """ +import logging +from typing import IO, Literal, Optional, Union + from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError def is_usable() -> bool: @@ -39,7 +43,7 @@ class File(Escpos): """ return is_usable() - def __init__(self, devfile="/dev/usb/lp0", auto_flush=True, *args, **kwargs): + def __init__(self, devfile: str = "", auto_flush: bool = True, *args, **kwargs): """Initialize file printer with device file. :param devfile: Device file under dev filesystem @@ -48,18 +52,41 @@ class File(Escpos): Escpos.__init__(self, *args, **kwargs) self.devfile = devfile self.auto_flush = auto_flush - self.open() - def open(self): - """Open system file.""" - self.device = open(self.devfile, "wb") + self._device: Union[Literal[False], Literal[None], IO[bytes]] = False - if self.device is None: - print("Could not open the specified file {0}".format(self.devfile)) + def open(self, raise_not_found: bool = True) -> None: + """Open system file. - def flush(self): + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + try: + # Open device + self.device: Optional[IO[bytes]] = open(self.devfile, "wb") + except OSError as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Could not open the specified file {self.devfile}:\n{e}" + ) + else: + logging.error("File printer %s not found", self.devfile) + return + logging.info("File printer enabled") + + def flush(self) -> None: """Flush printing content.""" - self.device.flush() + if self.device: + self.device.flush() def _raw(self, msg): """Print any command sent in raw format. @@ -71,8 +98,12 @@ class File(Escpos): if self.auto_flush: self.flush() - def close(self): + def close(self) -> None: """Close system file.""" - if self.device is not None: - self.device.flush() - self.device.close() + if not self._device: + return + logging.info("Closing File connection to printer %s", self.devfile) + if not self.auto_flush: + self.flush() + self._device.close() + self._device = False diff --git a/src/escpos/printer/lp.py b/src/escpos/printer/lp.py index 1b7ed54..951df66 100644 --- a/src/escpos/printer/lp.py +++ b/src/escpos/printer/lp.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -"""This module contains the implementation of the CupsPrinter printer driver. +"""This module contains the implementation of the LP printer driver. :author: python-escpos developers :organization: `python-escpos `_ @@ -9,11 +9,13 @@ """ import functools -import os +import logging import subprocess import sys +from typing import Literal, Optional, Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError def is_usable() -> bool: @@ -61,40 +63,124 @@ class LP(Escpos): """ return is_usable() - def __init__(self, printer_name: str, *args, **kwargs): + @dependency_linux_lp + def __init__(self, printer_name: str = "", *args, **kwargs): """LP class constructor. :param printer_name: CUPS printer name (Optional) - :type printer_name: str :param auto_flush: Automatic flush after every _raw() (Optional) - :type auto_flush: bool + :type auto_flush: bool (Defaults False) """ Escpos.__init__(self, *args, **kwargs) self.printer_name = printer_name - self.auto_flush = kwargs.get("auto_flush", True) - self.open() + self.auto_flush = kwargs.get("auto_flush", False) + self._flushed = False - @dependency_linux_lp - def open(self): - """Invoke _lp_ in a new subprocess and wait for commands.""" - self.lp = subprocess.Popen( - ["lp", "-d", self.printer_name, "-o", "raw"], - stdin=subprocess.PIPE, - stdout=open(os.devnull, "w"), + self._device: Union[Literal[False], Literal[None], subprocess.Popen] = False + + @property + def printers(self) -> dict: + """Available CUPS printers.""" + p_names = subprocess.run( + ["lpstat", "-e"], # Get printer names + capture_output=True, + text=True, ) + p_devs = subprocess.run( + ["lpstat", "-v"], # Get attached devices + capture_output=True, + text=True, + ) + # List and trim output lines + names = [name for name in p_names.stdout.split("\n") if name] + devs = [dev for dev in p_devs.stdout.split("\n") if dev] + # return a dict of {printer name: attached device} pairs + return {name: dev.split()[-1] for name in names for dev in devs if name in dev} - def close(self): + def _get_system_default_printer(self) -> str: + """Return the system's default printer name.""" + p_name = subprocess.run( + ["lpstat", "-d"], + capture_output=True, + text=True, + ) + name = p_name.stdout.split()[-1] + if name not in self.printers: + return "" + return name + + def open( + self, + job_name: str = "python-escpos", + raise_not_found: bool = True, + _close_opened: bool = True, + ) -> None: + """Invoke _lp_ in a new subprocess and wait for commands. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device and _close_opened: + self.close() + + self._is_closing = False + + self.job_name = job_name + try: + # Name validation, set default if no given name + self.printer_name = self.printer_name or self._get_system_default_printer() + assert self.printer_name in self.printers, "Incorrect printer name" + # Open device + self.device: Optional[subprocess.Popen] = subprocess.Popen( + ["lp", "-d", self.printer_name, "-t", self.job_name, "-o", "raw"], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except (AssertionError, subprocess.SubprocessError) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to start a print job for the printer {self.printer_name}:" + + f"\n{e}" + ) + else: + logging.error("LP printing %s not available", self.printer_name) + return + logging.info("LP printer enabled") + + def close(self) -> None: """Stop the subprocess.""" - self.lp.terminate() + if not self._device: + return + logging.info("Closing LP connection to printer %s", self.printer_name) + self._is_closing = True + if not self.auto_flush: + self.flush() + self._device.terminate() + self._device = False - def flush(self): + def flush(self) -> None: """End line and wait for new commands.""" - if self.lp.stdin.writable(): - self.lp.stdin.write(b"\n") - if self.lp.stdin.closed is False: - self.lp.stdin.close() - self.lp.wait() - self.open() + if not self.device or not self.device.stdin: + return + + if self._flushed: + return + + if self.device.stdin.writable(): + self.device.stdin.write(b"\n") + if self.device.stdin.closed is False: + self.device.stdin.close() + self.device.wait() + self._flushed = True + if not self._is_closing: + self.open(_close_opened=False) def _raw(self, msg): """Write raw command(s) to the printer. @@ -102,9 +188,10 @@ class LP(Escpos): :param msg: arbitrary code to be printed :type msg: bytes """ - if self.lp.stdin.writable(): - self.lp.stdin.write(msg) + if self.device.stdin.writable(): + self.device.stdin.write(msg) else: - raise Exception("Not a valid pipe for lp process") + raise subprocess.SubprocessError("Not a valid pipe for lp process") + self._flushed = False if self.auto_flush: self.flush() diff --git a/src/escpos/printer/network.py b/src/escpos/printer/network.py index 9e406fb..5b0d0fa 100644 --- a/src/escpos/printer/network.py +++ b/src/escpos/printer/network.py @@ -8,9 +8,12 @@ :license: MIT """ +import logging import socket +from typing import Literal, Optional, Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError def is_usable() -> bool: @@ -54,7 +57,14 @@ class Network(Escpos): """ return is_usable() - def __init__(self, host, port=9100, timeout=60, *args, **kwargs): + def __init__( + self, + host: str = "", + port: int = 9100, + timeout: Union[int, float] = 60, + *args, + **kwargs, + ): """Initialize network printer. :param host: Printer's host name or IP address @@ -65,16 +75,40 @@ class Network(Escpos): self.host = host self.port = port self.timeout = timeout - self.open() - def open(self): - """Open TCP socket with ``socket``-library and set it as escpos device.""" - self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.device.settimeout(self.timeout) - self.device.connect((self.host, self.port)) + self._device: Union[Literal[False], Literal[None], socket.socket] = False - if self.device is None: - print("Could not open socket for {0}".format(self.host)) + def open(self, raise_not_found: bool = True) -> None: + """Open TCP socket with ``socket``-library and set it as escpos device. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + try: + # Open device + self.device: Optional[socket.socket] = socket.socket( + socket.AF_INET, socket.SOCK_STREAM + ) + self.device.settimeout(self.timeout) + self.device.connect((self.host, self.port)) + except OSError as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Could not open socket for {self.host}:\n{e}" + ) + else: + logging.error("Network device %s not found", self.host) + return + logging.info("Network printer enabled") def _raw(self, msg): """Print any command sent in raw format. @@ -88,11 +122,14 @@ class Network(Escpos): """Read data from the TCP socket.""" return self.device.recv(16) - def close(self): + def close(self) -> None: """Close TCP connection.""" - if self.device is not None: - try: - self.device.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - self.device.close() + if not self._device: + return + logging.info("Closing Network connection to printer %s", self.host) + try: + self._device.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + self._device.close() + self._device = False diff --git a/src/escpos/printer/serial.py b/src/escpos/printer/serial.py index 0b80e12..bbe1d25 100644 --- a/src/escpos/printer/serial.py +++ b/src/escpos/printer/serial.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -"""This module contains the implementation of the CupsPrinter printer driver. +"""This module contains the implementation of the Serial printer driver. :author: python-escpos developers :organization: `python-escpos `_ @@ -10,8 +10,11 @@ import functools +import logging +from typing import Literal, Optional, Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError #: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`) _DEP_PYSERIAL = False @@ -73,16 +76,16 @@ class Serial(Escpos): @dependency_pyserial def __init__( self, - devfile="/dev/ttyS0", - baudrate=9600, - bytesize=8, - timeout=1, - parity=None, - stopbits=None, - xonxoff=False, - dsrdtr=True, + devfile: str = "", + baudrate: int = 9600, + bytesize: int = 8, + timeout: Union[int, float] = 1, + parity: Optional[str] = None, + stopbits: Optional[int] = None, + xonxoff: bool = False, + dsrdtr: bool = True, *args, - **kwargs + **kwargs, ): """Initialize serial printer. @@ -111,28 +114,46 @@ class Serial(Escpos): self.xonxoff = xonxoff self.dsrdtr = dsrdtr - self.open() + self._device: Union[Literal[False], Literal[None], serial.Serial] = False @dependency_pyserial - def open(self): - """Set up serial port and set is as escpos device.""" - if self.device is not None and self.device.is_open: - self.close() - self.device = serial.Serial( - port=self.devfile, - baudrate=self.baudrate, - bytesize=self.bytesize, - parity=self.parity, - stopbits=self.stopbits, - timeout=self.timeout, - xonxoff=self.xonxoff, - dsrdtr=self.dsrdtr, - ) + def open(self, raise_not_found: bool = True) -> None: + """Set up serial port and set is as escpos device. - if self.device is not None: - print("Serial printer enabled") - else: - print("Unable to open serial printer on: {0}".format(str(self.devfile))) + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + if self.device and self.device.is_open: + self.close() + + try: + # Open device + self.device: Optional[serial.Serial] = serial.Serial( + port=self.devfile, + baudrate=self.baudrate, + bytesize=self.bytesize, + parity=self.parity, + stopbits=self.stopbits, + timeout=self.timeout, + xonxoff=self.xonxoff, + dsrdtr=self.dsrdtr, + ) + except (ValueError, serial.SerialException) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to open serial printer on {self.devfile}:\n{e}" + ) + else: + logging.error("Serial device %s not found", self.devfile) + return + logging.info("Serial printer enabled") def _raw(self, msg): """Print any command sent in raw format. @@ -146,8 +167,12 @@ class Serial(Escpos): """Read the data buffer and return it to the caller.""" return self.device.read(16) - def close(self): + def close(self) -> None: """Close Serial interface.""" - if self.device is not None and self.device.is_open: - self.device.flush() - self.device.close() + if not self._device: + return + logging.info("Closing Serial connection to printer %s", self.devfile) + if self._device and self._device.is_open: + self._device.flush() + self._device.close() + self._device = False diff --git a/src/escpos/printer/usb.py b/src/escpos/printer/usb.py index 03f911f..d07df7b 100644 --- a/src/escpos/printer/usb.py +++ b/src/escpos/printer/usb.py @@ -8,9 +8,11 @@ :license: MIT """ import functools +import logging +from typing import Dict, Literal, Optional, Type, Union from ..escpos import Escpos -from ..exceptions import USBNotFoundError +from ..exceptions import DeviceNotFoundError, USBNotFoundError #: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`) _DEP_USB = False @@ -72,14 +74,14 @@ class Usb(Escpos): def __init__( self, - idVendor, - idProduct, - usb_args=None, - timeout=0, - in_ep=0x82, - out_ep=0x01, + idVendor: str = "", + idProduct: str = "", + usb_args: Dict[str, str] = {}, + timeout: Union[int, float] = 0, + in_ep: int = 0x82, + out_ep: int = 0x01, *args, - **kwargs + **kwargs, ): """Initialize USB printer. @@ -95,32 +97,65 @@ class Usb(Escpos): self.in_ep = in_ep self.out_ep = out_ep - usb_args = usb_args or {} + self.usb_args = usb_args or {} if idVendor: - usb_args["idVendor"] = idVendor + self.usb_args["idVendor"] = idVendor if idProduct: - usb_args["idProduct"] = idProduct - self.open(usb_args) + self.usb_args["idProduct"] = idProduct + + self._device: Union[ + Literal[False], Literal[None], Type[usb.core.Device] + ] = False @dependency_usb - def open(self, usb_args): + def open(self, raise_not_found: bool = True) -> None: """Search device on USB tree and set it as escpos device. - :param usb_args: USB arguments + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + :raises: :py:exc:`~escpos.exceptions.USBNotFoundError` """ - self.device = usb.core.find(**usb_args) - if self.device is None: - raise USBNotFoundError("Device not found or cable not plugged in.") + if self._device: + self.close() - self.idVendor = self.device.idVendor - self.idProduct = self.device.idProduct + # Open device + try: + self.device: Optional[Type[usb.core.Device]] = usb.core.find( + **self.usb_args + ) + assert self.device, USBNotFoundError( + f"Device {tuple(self.usb_args.values())} not found" + + " or cable not plugged in." + ) + self._check_driver() + self._configure_usb() + except (AssertionError, usb.core.USBError) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to open USB printer on {tuple(self.usb_args.values())}:" + + f"\n{e}" + ) + else: + logging.error("USB device %s not found", tuple(self.usb_args.values())) + return + logging.info("USB printer enabled") - # pyusb has three backends: libusb0, libusb1 and openusb but - # only libusb1 backend implements the methods is_kernel_driver_active() - # and detach_kernel_driver(). - # This helps enable this library to work on Windows. - if self.device.backend.__module__.endswith("libusb1"): - check_driver = None + def _check_driver(self) -> None: + """Check the driver. + + pyusb has three backends: libusb0, libusb1 and openusb but + only libusb1 backend implements the methods is_kernel_driver_active() + and detach_kernel_driver(). + This helps enable this library to work on Windows. + """ + if self.device and self.device.backend.__module__.endswith("libusb1"): + check_driver: Optional[bool] = None try: check_driver = self.device.is_kernel_driver_active(0) @@ -134,13 +169,17 @@ class Usb(Escpos): pass except usb.core.USBError as e: if check_driver is not None: - print("Could not detatch kernel driver: {0}".format(str(e))) + logging.error("Could not detatch kernel driver: %s", str(e)) + def _configure_usb(self) -> None: + """Configure USB.""" + if not self.device: + return try: self.device.set_configuration() self.device.reset() except usb.core.USBError as e: - print("Could not set configuration: {0}".format(str(e))) + logging.error("Could not set configuration: %s", str(e)) def _raw(self, msg): """Print any command sent in raw format. @@ -155,8 +194,12 @@ class Usb(Escpos): return self.device.read(self.in_ep, 16) @dependency_usb - def close(self): + def close(self) -> None: """Release USB interface.""" - if self.device: - usb.util.dispose_resources(self.device) - self.device = None + if not self._device: + return + logging.info( + "Closing Usb connection to printer %s", tuple(self.usb_args.values()) + ) + usb.util.dispose_resources(self._device) + self._device = False diff --git a/src/escpos/printer/win32raw.py b/src/escpos/printer/win32raw.py index d28b8c2..396c3e9 100644 --- a/src/escpos/printer/win32raw.py +++ b/src/escpos/printer/win32raw.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -"""This module contains the implementation of the CupsPrinter printer driver. +"""This module contains the implementation of the Win32Raw printer driver. :author: python-escpos developers :organization: `python-escpos `_ @@ -9,12 +9,16 @@ """ import functools +import logging +from typing import Literal, Optional, Type, Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError #: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`) _DEP_WIN32PRINT = False + try: import win32print @@ -70,38 +74,78 @@ class Win32Raw(Escpos): return is_usable() @dependency_win32print - def __init__(self, printer_name=None, *args, **kwargs): + def __init__(self, printer_name: str = "", *args, **kwargs): """Initialize default printer.""" Escpos.__init__(self, *args, **kwargs) - if printer_name is not None: - self.printer_name = printer_name - else: - self.printer_name = win32print.GetDefaultPrinter() - self.hPrinter = None - self.open() + self.printer_name = printer_name + self.job_name = "" - @dependency_win32print - def open(self, job_name="python-escpos"): - """Open connection to default printer.""" - if self.printer_name is None: - raise Exception("Printer not found") - self.hPrinter = win32print.OpenPrinter(self.printer_name) - self.current_job = win32print.StartDocPrinter( - self.hPrinter, 1, (job_name, None, "RAW") - ) - win32print.StartPagePrinter(self.hPrinter) + self._device: Union[ + Literal[False], + Literal[None], + Type[win32print.OpenPrinter], + ] = False - @dependency_win32print - def close(self): + @property + def printers(self) -> dict: + """Available Windows printers.""" + return { + printer["pPrinterName"]: printer + for printer in win32print.EnumPrinters(win32print.PRINTER_ENUM_NAME, "", 4) + } + + def open( + self, job_name: str = "python-escpos", raise_not_found: bool = True + ) -> None: + """Open connection to default printer. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + self.job_name = job_name + try: + # Name validation, set default if no given name + self.printer_name = self.printer_name or win32print.GetDefaultPrinter() + assert self.printer_name in self.printers, "Incorrect printer name" + # Open device + self.device: Optional[ + Type[win32print.OpenPrinter] + ] = win32print.OpenPrinter(self.printer_name) + if self.device: + self.current_job = win32print.StartDocPrinter( + self.device, 1, (job_name, None, "RAW") + ) + win32print.StartPagePrinter(self.device) + except AssertionError as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to start a print job for the printer {self.printer_name}:" + + f"\n{e}" + ) + else: + logging.error("Win32Raw printing %s not available", self.printer_name) + return + logging.info("Win32Raw printer enabled") + + def close(self) -> None: """Close connection to default printer.""" - if not self.hPrinter: + if not self._device: return - win32print.EndPagePrinter(self.hPrinter) - win32print.EndDocPrinter(self.hPrinter) - win32print.ClosePrinter(self.hPrinter) - self.hPrinter = None + logging.info("Closing Win32Raw connection to printer %s", self.printer_name) + win32print.EndPagePrinter(self._device) + win32print.EndDocPrinter(self._device) + win32print.ClosePrinter(self._device) + self._device = False - @dependency_win32print def _raw(self, msg): """Print any command sent in raw format. @@ -110,6 +154,6 @@ class Win32Raw(Escpos): """ if self.printer_name is None: raise Exception("Printer not found") - if self.hPrinter is None: + if not self.device: raise Exception("Printer job not opened") - win32print.WritePrinter(self.hPrinter, msg) + win32print.WritePrinter(self.device, msg) diff --git a/tox.ini b/tox.ini index 08922e7..2abdad6 100644 --- a/tox.ini +++ b/tox.ini @@ -57,7 +57,6 @@ deps = mypy types-appdirs types-Pillow types-pyserial - types-pywin32 hypothesis>=6.83 jaconv commands = mypy src test