diff --git a/src/escpos/printer/cups.py b/src/escpos/printer/cups.py index a870b9b..192566b 100644 --- a/src/escpos/printer/cups.py +++ b/src/escpos/printer/cups.py @@ -9,9 +9,12 @@ """ import functools +import logging import tempfile +from typing import Optional, Type from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError #: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`) _DEP_PYCUPS = False @@ -24,7 +27,6 @@ try: DEFAULT_HOST = cups.getServer() DEFAULT_PORT = cups.getPort() except ImportError: - print("Error") pass @@ -103,28 +105,61 @@ class CupsPrinter(Escpos): self.pending_job = False @property - def printers(self): + def printers(self) -> dict: """Available CUPS printers.""" - return self.device.getPrinters() + if self.device: + return self.device.getPrinters() + return {} - def open(self, job_name="python-escpos"): + def open( + self, job_name: str = "python-escpos", raise_not_found: bool = True + ) -> None: """Set up a new print job and target the printer. A call to this method is required to send new jobs to - the same CUPS connection after close. + the CUPS connection after close. Defaults to default CUPS printer. Creates a new temporary file buffer. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` """ + if self._device: + self.close() + cups.setServer(self.host) cups.setPort(self.port) - self.device = cups.Connection() self.job_name = job_name - if self.printer_name not in self.printers: - self.printer_name = self.device.getDefault() if self.tmpfile.closed: self.tmpfile = tempfile.NamedTemporaryFile(delete=True) + try: + # Open device + self.device: Optional[Type[cups.Connection]] = cups.Connection() + if self.device: + # Name validation, set default if no given name + self.printer_name = self.printer_name or self.device.getDefault() + assert self.printer_name in self.printers, "Incorrect printer name" + except (RuntimeError, AssertionError) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to start a print job for the printer {self.printer_name}:" + + f"\n{e}" + ) + else: + logging.error( + "CupsPrinter printing %s not available", self.printer_name + ) + return + logging.info("CupsPrinter printer enabled") + def _raw(self, msg): """Append any command sent in raw format to temporary file. @@ -181,5 +216,5 @@ class CupsPrinter(Escpos): return if self.pending_job: self.send() - print(f"Closing CUPS connection to printer {self.printer_name}") + logging.info("Closing CUPS connection to printer %s", self.printer_name) self._device = False diff --git a/src/escpos/printer/file.py b/src/escpos/printer/file.py index 5fa24d6..c1c3b5a 100644 --- a/src/escpos/printer/file.py +++ b/src/escpos/printer/file.py @@ -8,7 +8,10 @@ :license: MIT """ +import logging + from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError def is_usable() -> bool: @@ -39,9 +42,7 @@ class File(Escpos): """ return is_usable() - def __init__( - self, devfile: str = "/dev/usb/lp0", auto_flush: bool = True, *args, **kwargs - ): + def __init__(self, devfile: str = "", auto_flush: bool = True, *args, **kwargs): """Initialize file printer with device file. :param devfile: Device file under dev filesystem @@ -51,12 +52,33 @@ class File(Escpos): self.devfile = devfile self.auto_flush = auto_flush - def open(self): - """Open system file.""" - self.device = open(self.devfile, "wb") + def open(self, raise_not_found: bool = True) -> None: + """Open system file. - if not self.device: - print("Could not open the specified file {0}".format(self.devfile)) + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + try: + # Open device + self.device = open(self.devfile, "wb") + except OSError as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Could not open the specified file {self.devfile}:\n{e}" + ) + else: + logging.error("File printer %s not found", self.devfile) + return + logging.info("File printer enabled") def flush(self): """Flush printing content.""" @@ -76,6 +98,7 @@ class File(Escpos): """Close system file.""" if not self._device: return + logging.info("Closing File connection to printer %s", self.devfile) self.device.flush() self.device.close() self._device = False diff --git a/src/escpos/printer/lp.py b/src/escpos/printer/lp.py index 02971e9..f32c9c4 100644 --- a/src/escpos/printer/lp.py +++ b/src/escpos/printer/lp.py @@ -9,11 +9,13 @@ """ import functools -import os +import logging import subprocess import sys +from typing import ByteString from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError def is_usable() -> bool: @@ -74,18 +76,48 @@ class LP(Escpos): self.auto_flush = kwargs.get("auto_flush", True) @dependency_linux_lp - def open(self): - """Invoke _lp_ in a new subprocess and wait for commands.""" + def open(self, raise_not_found: bool = True) -> None: + """Invoke _lp_ in a new subprocess and wait for commands. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + # Open device self.device = subprocess.Popen( ["lp", "-d", self.printer_name, "-o", "raw"], stdin=subprocess.PIPE, - stdout=open(os.devnull, "w"), + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, ) + error: ByteString = b"" + if self.device and self.device.stderr: + error = self.device.stderr.read() + if bool(error): + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to start a print job for the printer {self.printer_name}:" + + f"\n{error!r}" + ) + else: + logging.error("LP printing %s not available", self.printer_name) + return + logging.info("LP printer enabled") + def close(self): """Stop the subprocess.""" if not self._device: return + logging.info("Closing LP connection to printer %s", self.printer_name) self.device.terminate() self._device = False diff --git a/src/escpos/printer/network.py b/src/escpos/printer/network.py index f6b875a..360fcff 100644 --- a/src/escpos/printer/network.py +++ b/src/escpos/printer/network.py @@ -8,10 +8,12 @@ :license: MIT """ +import logging import socket from typing import Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError def is_usable() -> bool: @@ -61,7 +63,7 @@ class Network(Escpos): port: int = 9100, timeout: Union[int, float] = 60, *args, - **kwargs + **kwargs, ): """Initialize network printer. @@ -74,14 +76,35 @@ class Network(Escpos): self.port = port self.timeout = timeout - def open(self): - """Open TCP socket with ``socket``-library and set it as escpos device.""" - self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.device.settimeout(self.timeout) - self.device.connect((self.host, self.port)) + def open(self, raise_not_found: bool = True) -> None: + """Open TCP socket with ``socket``-library and set it as escpos device. - if not self.device: - print("Could not open socket for {0}".format(self.host)) + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + try: + # Open device + self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.device.settimeout(self.timeout) + self.device.connect((self.host, self.port)) + except OSError as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Could not open socket for {self.host}:\n{e}" + ) + else: + logging.error("Network device %s not found", self.host) + return + logging.info("Network printer enabled") def _raw(self, msg): """Print any command sent in raw format. @@ -99,6 +122,7 @@ class Network(Escpos): """Close TCP connection.""" if not self._device: return + logging.info("Closing Network connection to printer %s", self.host) try: self.device.shutdown(socket.SHUT_RDWR) except socket.error: diff --git a/src/escpos/printer/serial.py b/src/escpos/printer/serial.py index b90d9e8..bf093a9 100644 --- a/src/escpos/printer/serial.py +++ b/src/escpos/printer/serial.py @@ -10,9 +10,11 @@ import functools -from typing import Optional, Union +import logging +from typing import Optional, Type, Union from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError #: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`) _DEP_PYSERIAL = False @@ -20,6 +22,8 @@ _DEP_PYSERIAL = False try: import serial + # import serial.SerialException + _DEP_PYSERIAL = True except ImportError: pass @@ -74,7 +78,7 @@ class Serial(Escpos): @dependency_pyserial def __init__( self, - devfile: str = "/dev/ttyS0", + devfile: str = "", baudrate: int = 9600, bytesize: int = 8, timeout: Union[int, float] = 1, @@ -83,7 +87,7 @@ class Serial(Escpos): xonxoff: bool = False, dsrdtr: bool = True, *args, - **kwargs + **kwargs, ): """Initialize serial printer. @@ -113,25 +117,42 @@ class Serial(Escpos): self.dsrdtr = dsrdtr @dependency_pyserial - def open(self): - """Set up serial port and set is as escpos device.""" - if self.device and self.device.is_open: - self.close() - self.device = serial.Serial( - port=self.devfile, - baudrate=self.baudrate, - bytesize=self.bytesize, - parity=self.parity, - stopbits=self.stopbits, - timeout=self.timeout, - xonxoff=self.xonxoff, - dsrdtr=self.dsrdtr, - ) + def open(self, raise_not_found: bool = True) -> None: + """Set up serial port and set is as escpos device. - if self.device: - print("Serial printer enabled") - else: - print("Unable to open serial printer on: {0}".format(str(self.devfile))) + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device and self.device.is_open: + self.close() + + try: + # Open device + self.device: Optional[Type[serial.Serial]] = serial.Serial( + port=self.devfile, + baudrate=self.baudrate, + bytesize=self.bytesize, + parity=self.parity, + stopbits=self.stopbits, + timeout=self.timeout, + xonxoff=self.xonxoff, + dsrdtr=self.dsrdtr, + ) + except (ValueError, serial.SerialException) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to open serial printer on {self.devfile}:\n{e}" + ) + else: + logging.error("Serial device %s not found", self.devfile) + return + logging.info("Serial printer enabled") def _raw(self, msg): """Print any command sent in raw format. @@ -149,6 +170,7 @@ class Serial(Escpos): """Close Serial interface.""" if not self._device: return + logging.info("Closing Serial connection to printer %s", self.devfile) if self.device.is_open: self.device.flush() self.device.close() diff --git a/src/escpos/printer/usb.py b/src/escpos/printer/usb.py index 334b241..47fc0ee 100644 --- a/src/escpos/printer/usb.py +++ b/src/escpos/printer/usb.py @@ -8,10 +8,11 @@ :license: MIT """ import functools -from typing import Dict, Union +import logging +from typing import Dict, Optional, Type, Union from ..escpos import Escpos -from ..exceptions import USBNotFoundError +from ..exceptions import DeviceNotFoundError, USBNotFoundError #: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`) _DEP_USB = False @@ -80,7 +81,7 @@ class Usb(Escpos): in_ep: int = 0x82, out_ep: int = 0x01, *args, - **kwargs + **kwargs, ): """Initialize USB printer. @@ -103,21 +104,54 @@ class Usb(Escpos): self.usb_args["idProduct"] = idProduct @dependency_usb - def open(self): - """Search device on USB tree and set it as escpos device.""" - self.device = usb.core.find(**self.usb_args) - if self.device is None: - raise USBNotFoundError("Device not found or cable not plugged in.") + def open(self, raise_not_found: bool = True) -> None: + """Search device on USB tree and set it as escpos device. - self.idVendor = self.device.idVendor - self.idProduct = self.device.idProduct + By default raise an exception if device is not found. - # pyusb has three backends: libusb0, libusb1 and openusb but - # only libusb1 backend implements the methods is_kernel_driver_active() - # and detach_kernel_driver(). - # This helps enable this library to work on Windows. - if self.device.backend.__module__.endswith("libusb1"): - check_driver = None + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + :raises: :py:exc:`~escpos.exceptions.USBNotFoundError` + """ + if self._device: + self.close() + + # Open device + try: + self.device: Optional[Type[usb.core.Device]] = usb.core.find( + **self.usb_args + ) + assert self.device, USBNotFoundError( + f"Device {tuple(self.usb_args.values())} not found" + + " or cable not plugged in." + ) + self._check_driver() + self._configure_usb() + except (AssertionError, usb.core.USBError) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to open USB printer on {tuple(self.usb_args.values())}:" + + f"\n{e}" + ) + else: + logging.error("USB device %s not found", tuple(self.usb_args.values())) + return + logging.info("USB printer enabled") + + def _check_driver(self) -> None: + """Check the driver. + + pyusb has three backends: libusb0, libusb1 and openusb but + only libusb1 backend implements the methods is_kernel_driver_active() + and detach_kernel_driver(). + This helps enable this library to work on Windows. + """ + if self._device and self.device.backend.__module__.endswith("libusb1"): + check_driver: Optional[bool] = None try: check_driver = self.device.is_kernel_driver_active(0) @@ -131,13 +165,17 @@ class Usb(Escpos): pass except usb.core.USBError as e: if check_driver is not None: - print("Could not detatch kernel driver: {0}".format(str(e))) + logging.error("Could not detatch kernel driver: %s", str(e)) + def _configure_usb(self) -> None: + """Configure USB.""" + if not self.device: + return try: self.device.set_configuration() self.device.reset() except usb.core.USBError as e: - print("Could not set configuration: {0}".format(str(e))) + logging.error("Could not set configuration: %s", str(e)) def _raw(self, msg): """Print any command sent in raw format. @@ -156,5 +194,8 @@ class Usb(Escpos): """Release USB interface.""" if not self._device: return + logging.info( + "Closing Usb connection to printer %s", tuple(self.usb_args.values()) + ) usb.util.dispose_resources(self.device) self._device = False diff --git a/src/escpos/printer/win32raw.py b/src/escpos/printer/win32raw.py index c71fc00..4f1959e 100644 --- a/src/escpos/printer/win32raw.py +++ b/src/escpos/printer/win32raw.py @@ -9,13 +9,16 @@ """ import functools +import logging from ..escpos import Escpos +from ..exceptions import DeviceNotFoundError #: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`) _DEP_WIN32PRINT = False try: + import pywintypes import win32print _DEP_WIN32PRINT = True @@ -73,27 +76,66 @@ class Win32Raw(Escpos): def __init__(self, printer_name: str = "", *args, **kwargs): """Initialize default printer.""" Escpos.__init__(self, *args, **kwargs) - if printer_name is not None: - self.printer_name = printer_name - else: - self.printer_name = win32print.GetDefaultPrinter() + self.printer_name = printer_name + self.job_name = "" + + @property + def printers(self) -> dict: + """Available Windows printers.""" + return { + printer["pPrinterName"]: printer + for printer in win32print.EnumPrinters( + win32print.PRINTER_ENUM_NAME, None, 4 + ) + } @dependency_win32print - def open(self, job_name="python-escpos"): - """Open connection to default printer.""" - if self.printer_name is None: - raise Exception("Printer not found") - self.device = win32print.OpenPrinter(self.printer_name) - self.current_job = win32print.StartDocPrinter( - self.device, 1, (job_name, None, "RAW") - ) - win32print.StartPagePrinter(self.device) + def open( + self, job_name: str = "python-escpos", raise_not_found: bool = True + ) -> None: + """Open connection to default printer. + + By default raise an exception if device is not found. + + :param raise_not_found: Default True. + False to log error but do not raise exception. + + :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError` + """ + if self._device: + self.close() + + self.job_name = job_name + try: + # Name validation, set default if no given name + self.printer_name = self.printer_name or win32print.GetDefaultPrinter() + assert self.printer_name in self.printers, "Incorrect printer name" + # Open device + self.device = win32print.OpenPrinter(self.printer_name) + if self.device: + self.current_job = win32print.StartDocPrinter( + self.device, 1, (job_name, None, "RAW") + ) + win32print.StartPagePrinter(self.device) + except (AssertionError, pywintypes.error) as e: + # Raise exception or log error and cancel + self.device = None + if raise_not_found: + raise DeviceNotFoundError( + f"Unable to start a print job for the printer {self.printer_name}:" + + f"\n{e}" + ) + else: + logging.error("Win32Raw printing %s not available", self.printer_name) + return + logging.info("Win32Raw printer enabled") @dependency_win32print def close(self): """Close connection to default printer.""" if not self._device: return + logging.info("Closing Win32Raw connection to printer %s", self.printer_name) win32print.EndPagePrinter(self.device) win32print.EndDocPrinter(self.device) win32print.ClosePrinter(self.device)