Enhance connectors consistency

This commit is contained in:
belono 2023-09-16 19:55:31 +02:00
parent 30a34460e8
commit 2bd11b38d5
7 changed files with 300 additions and 81 deletions

View File

@ -9,9 +9,12 @@
""" """
import functools import functools
import logging
import tempfile import tempfile
from typing import Optional, Type
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
#: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`) #: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`)
_DEP_PYCUPS = False _DEP_PYCUPS = False
@ -24,7 +27,6 @@ try:
DEFAULT_HOST = cups.getServer() DEFAULT_HOST = cups.getServer()
DEFAULT_PORT = cups.getPort() DEFAULT_PORT = cups.getPort()
except ImportError: except ImportError:
print("Error")
pass pass
@ -103,28 +105,61 @@ class CupsPrinter(Escpos):
self.pending_job = False self.pending_job = False
@property @property
def printers(self): def printers(self) -> dict:
"""Available CUPS printers.""" """Available CUPS printers."""
if self.device:
return self.device.getPrinters() return self.device.getPrinters()
return {}
def open(self, job_name="python-escpos"): def open(
self, job_name: str = "python-escpos", raise_not_found: bool = True
) -> None:
"""Set up a new print job and target the printer. """Set up a new print job and target the printer.
A call to this method is required to send new jobs to A call to this method is required to send new jobs to
the same CUPS connection after close. the CUPS connection after close.
Defaults to default CUPS printer. Defaults to default CUPS printer.
Creates a new temporary file buffer. Creates a new temporary file buffer.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
""" """
if self._device:
self.close()
cups.setServer(self.host) cups.setServer(self.host)
cups.setPort(self.port) cups.setPort(self.port)
self.device = cups.Connection()
self.job_name = job_name self.job_name = job_name
if self.printer_name not in self.printers:
self.printer_name = self.device.getDefault()
if self.tmpfile.closed: if self.tmpfile.closed:
self.tmpfile = tempfile.NamedTemporaryFile(delete=True) self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
try:
# Open device
self.device: Optional[Type[cups.Connection]] = cups.Connection()
if self.device:
# Name validation, set default if no given name
self.printer_name = self.printer_name or self.device.getDefault()
assert self.printer_name in self.printers, "Incorrect printer name"
except (RuntimeError, AssertionError) as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to start a print job for the printer {self.printer_name}:"
+ f"\n{e}"
)
else:
logging.error(
"CupsPrinter printing %s not available", self.printer_name
)
return
logging.info("CupsPrinter printer enabled")
def _raw(self, msg): def _raw(self, msg):
"""Append any command sent in raw format to temporary file. """Append any command sent in raw format to temporary file.
@ -181,5 +216,5 @@ class CupsPrinter(Escpos):
return return
if self.pending_job: if self.pending_job:
self.send() self.send()
print(f"Closing CUPS connection to printer {self.printer_name}") logging.info("Closing CUPS connection to printer %s", self.printer_name)
self._device = False self._device = False

View File

@ -8,7 +8,10 @@
:license: MIT :license: MIT
""" """
import logging
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
def is_usable() -> bool: def is_usable() -> bool:
@ -39,9 +42,7 @@ class File(Escpos):
""" """
return is_usable() return is_usable()
def __init__( def __init__(self, devfile: str = "", auto_flush: bool = True, *args, **kwargs):
self, devfile: str = "/dev/usb/lp0", auto_flush: bool = True, *args, **kwargs
):
"""Initialize file printer with device file. """Initialize file printer with device file.
:param devfile: Device file under dev filesystem :param devfile: Device file under dev filesystem
@ -51,12 +52,33 @@ class File(Escpos):
self.devfile = devfile self.devfile = devfile
self.auto_flush = auto_flush self.auto_flush = auto_flush
def open(self): def open(self, raise_not_found: bool = True) -> None:
"""Open system file.""" """Open system file.
self.device = open(self.devfile, "wb")
if not self.device: By default raise an exception if device is not found.
print("Could not open the specified file {0}".format(self.devfile))
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
try:
# Open device
self.device = open(self.devfile, "wb")
except OSError as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Could not open the specified file {self.devfile}:\n{e}"
)
else:
logging.error("File printer %s not found", self.devfile)
return
logging.info("File printer enabled")
def flush(self): def flush(self):
"""Flush printing content.""" """Flush printing content."""
@ -76,6 +98,7 @@ class File(Escpos):
"""Close system file.""" """Close system file."""
if not self._device: if not self._device:
return return
logging.info("Closing File connection to printer %s", self.devfile)
self.device.flush() self.device.flush()
self.device.close() self.device.close()
self._device = False self._device = False

View File

@ -9,11 +9,13 @@
""" """
import functools import functools
import os import logging
import subprocess import subprocess
import sys import sys
from typing import ByteString
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
def is_usable() -> bool: def is_usable() -> bool:
@ -74,18 +76,48 @@ class LP(Escpos):
self.auto_flush = kwargs.get("auto_flush", True) self.auto_flush = kwargs.get("auto_flush", True)
@dependency_linux_lp @dependency_linux_lp
def open(self): def open(self, raise_not_found: bool = True) -> None:
"""Invoke _lp_ in a new subprocess and wait for commands.""" """Invoke _lp_ in a new subprocess and wait for commands.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
# Open device
self.device = subprocess.Popen( self.device = subprocess.Popen(
["lp", "-d", self.printer_name, "-o", "raw"], ["lp", "-d", self.printer_name, "-o", "raw"],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=open(os.devnull, "w"), stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
) )
error: ByteString = b""
if self.device and self.device.stderr:
error = self.device.stderr.read()
if bool(error):
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to start a print job for the printer {self.printer_name}:"
+ f"\n{error!r}"
)
else:
logging.error("LP printing %s not available", self.printer_name)
return
logging.info("LP printer enabled")
def close(self): def close(self):
"""Stop the subprocess.""" """Stop the subprocess."""
if not self._device: if not self._device:
return return
logging.info("Closing LP connection to printer %s", self.printer_name)
self.device.terminate() self.device.terminate()
self._device = False self._device = False

View File

@ -8,10 +8,12 @@
:license: MIT :license: MIT
""" """
import logging
import socket import socket
from typing import Union from typing import Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
def is_usable() -> bool: def is_usable() -> bool:
@ -61,7 +63,7 @@ class Network(Escpos):
port: int = 9100, port: int = 9100,
timeout: Union[int, float] = 60, timeout: Union[int, float] = 60,
*args, *args,
**kwargs **kwargs,
): ):
"""Initialize network printer. """Initialize network printer.
@ -74,14 +76,35 @@ class Network(Escpos):
self.port = port self.port = port
self.timeout = timeout self.timeout = timeout
def open(self): def open(self, raise_not_found: bool = True) -> None:
"""Open TCP socket with ``socket``-library and set it as escpos device.""" """Open TCP socket with ``socket``-library and set it as escpos device.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
try:
# Open device
self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.device.settimeout(self.timeout) self.device.settimeout(self.timeout)
self.device.connect((self.host, self.port)) self.device.connect((self.host, self.port))
except OSError as e:
if not self.device: # Raise exception or log error and cancel
print("Could not open socket for {0}".format(self.host)) self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Could not open socket for {self.host}:\n{e}"
)
else:
logging.error("Network device %s not found", self.host)
return
logging.info("Network printer enabled")
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -99,6 +122,7 @@ class Network(Escpos):
"""Close TCP connection.""" """Close TCP connection."""
if not self._device: if not self._device:
return return
logging.info("Closing Network connection to printer %s", self.host)
try: try:
self.device.shutdown(socket.SHUT_RDWR) self.device.shutdown(socket.SHUT_RDWR)
except socket.error: except socket.error:

View File

@ -10,9 +10,11 @@
import functools import functools
from typing import Optional, Union import logging
from typing import Optional, Type, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
#: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`) #: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`)
_DEP_PYSERIAL = False _DEP_PYSERIAL = False
@ -20,6 +22,8 @@ _DEP_PYSERIAL = False
try: try:
import serial import serial
# import serial.SerialException
_DEP_PYSERIAL = True _DEP_PYSERIAL = True
except ImportError: except ImportError:
pass pass
@ -74,7 +78,7 @@ class Serial(Escpos):
@dependency_pyserial @dependency_pyserial
def __init__( def __init__(
self, self,
devfile: str = "/dev/ttyS0", devfile: str = "",
baudrate: int = 9600, baudrate: int = 9600,
bytesize: int = 8, bytesize: int = 8,
timeout: Union[int, float] = 1, timeout: Union[int, float] = 1,
@ -83,7 +87,7 @@ class Serial(Escpos):
xonxoff: bool = False, xonxoff: bool = False,
dsrdtr: bool = True, dsrdtr: bool = True,
*args, *args,
**kwargs **kwargs,
): ):
"""Initialize serial printer. """Initialize serial printer.
@ -113,11 +117,22 @@ class Serial(Escpos):
self.dsrdtr = dsrdtr self.dsrdtr = dsrdtr
@dependency_pyserial @dependency_pyserial
def open(self): def open(self, raise_not_found: bool = True) -> None:
"""Set up serial port and set is as escpos device.""" """Set up serial port and set is as escpos device.
if self.device and self.device.is_open:
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device and self.device.is_open:
self.close() self.close()
self.device = serial.Serial(
try:
# Open device
self.device: Optional[Type[serial.Serial]] = serial.Serial(
port=self.devfile, port=self.devfile,
baudrate=self.baudrate, baudrate=self.baudrate,
bytesize=self.bytesize, bytesize=self.bytesize,
@ -127,11 +142,17 @@ class Serial(Escpos):
xonxoff=self.xonxoff, xonxoff=self.xonxoff,
dsrdtr=self.dsrdtr, dsrdtr=self.dsrdtr,
) )
except (ValueError, serial.SerialException) as e:
if self.device: # Raise exception or log error and cancel
print("Serial printer enabled") self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to open serial printer on {self.devfile}:\n{e}"
)
else: else:
print("Unable to open serial printer on: {0}".format(str(self.devfile))) logging.error("Serial device %s not found", self.devfile)
return
logging.info("Serial printer enabled")
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -149,6 +170,7 @@ class Serial(Escpos):
"""Close Serial interface.""" """Close Serial interface."""
if not self._device: if not self._device:
return return
logging.info("Closing Serial connection to printer %s", self.devfile)
if self.device.is_open: if self.device.is_open:
self.device.flush() self.device.flush()
self.device.close() self.device.close()

View File

@ -8,10 +8,11 @@
:license: MIT :license: MIT
""" """
import functools import functools
from typing import Dict, Union import logging
from typing import Dict, Optional, Type, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import USBNotFoundError from ..exceptions import DeviceNotFoundError, USBNotFoundError
#: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`) #: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`)
_DEP_USB = False _DEP_USB = False
@ -80,7 +81,7 @@ class Usb(Escpos):
in_ep: int = 0x82, in_ep: int = 0x82,
out_ep: int = 0x01, out_ep: int = 0x01,
*args, *args,
**kwargs **kwargs,
): ):
"""Initialize USB printer. """Initialize USB printer.
@ -103,21 +104,54 @@ class Usb(Escpos):
self.usb_args["idProduct"] = idProduct self.usb_args["idProduct"] = idProduct
@dependency_usb @dependency_usb
def open(self): def open(self, raise_not_found: bool = True) -> None:
"""Search device on USB tree and set it as escpos device.""" """Search device on USB tree and set it as escpos device.
self.device = usb.core.find(**self.usb_args)
if self.device is None:
raise USBNotFoundError("Device not found or cable not plugged in.")
self.idVendor = self.device.idVendor By default raise an exception if device is not found.
self.idProduct = self.device.idProduct
# pyusb has three backends: libusb0, libusb1 and openusb but :param raise_not_found: Default True.
# only libusb1 backend implements the methods is_kernel_driver_active() False to log error but do not raise exception.
# and detach_kernel_driver().
# This helps enable this library to work on Windows. :raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
if self.device.backend.__module__.endswith("libusb1"): :raises: :py:exc:`~escpos.exceptions.USBNotFoundError`
check_driver = None """
if self._device:
self.close()
# Open device
try:
self.device: Optional[Type[usb.core.Device]] = usb.core.find(
**self.usb_args
)
assert self.device, USBNotFoundError(
f"Device {tuple(self.usb_args.values())} not found"
+ " or cable not plugged in."
)
self._check_driver()
self._configure_usb()
except (AssertionError, usb.core.USBError) as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to open USB printer on {tuple(self.usb_args.values())}:"
+ f"\n{e}"
)
else:
logging.error("USB device %s not found", tuple(self.usb_args.values()))
return
logging.info("USB printer enabled")
def _check_driver(self) -> None:
"""Check the driver.
pyusb has three backends: libusb0, libusb1 and openusb but
only libusb1 backend implements the methods is_kernel_driver_active()
and detach_kernel_driver().
This helps enable this library to work on Windows.
"""
if self._device and self.device.backend.__module__.endswith("libusb1"):
check_driver: Optional[bool] = None
try: try:
check_driver = self.device.is_kernel_driver_active(0) check_driver = self.device.is_kernel_driver_active(0)
@ -131,13 +165,17 @@ class Usb(Escpos):
pass pass
except usb.core.USBError as e: except usb.core.USBError as e:
if check_driver is not None: if check_driver is not None:
print("Could not detatch kernel driver: {0}".format(str(e))) logging.error("Could not detatch kernel driver: %s", str(e))
def _configure_usb(self) -> None:
"""Configure USB."""
if not self.device:
return
try: try:
self.device.set_configuration() self.device.set_configuration()
self.device.reset() self.device.reset()
except usb.core.USBError as e: except usb.core.USBError as e:
print("Could not set configuration: {0}".format(str(e))) logging.error("Could not set configuration: %s", str(e))
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -156,5 +194,8 @@ class Usb(Escpos):
"""Release USB interface.""" """Release USB interface."""
if not self._device: if not self._device:
return return
logging.info(
"Closing Usb connection to printer %s", tuple(self.usb_args.values())
)
usb.util.dispose_resources(self.device) usb.util.dispose_resources(self.device)
self._device = False self._device = False

View File

@ -9,13 +9,16 @@
""" """
import functools import functools
import logging
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
#: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`) #: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`)
_DEP_WIN32PRINT = False _DEP_WIN32PRINT = False
try: try:
import pywintypes
import win32print import win32print
_DEP_WIN32PRINT = True _DEP_WIN32PRINT = True
@ -73,27 +76,66 @@ class Win32Raw(Escpos):
def __init__(self, printer_name: str = "", *args, **kwargs): def __init__(self, printer_name: str = "", *args, **kwargs):
"""Initialize default printer.""" """Initialize default printer."""
Escpos.__init__(self, *args, **kwargs) Escpos.__init__(self, *args, **kwargs)
if printer_name is not None:
self.printer_name = printer_name self.printer_name = printer_name
else: self.job_name = ""
self.printer_name = win32print.GetDefaultPrinter()
@property
def printers(self) -> dict:
"""Available Windows printers."""
return {
printer["pPrinterName"]: printer
for printer in win32print.EnumPrinters(
win32print.PRINTER_ENUM_NAME, None, 4
)
}
@dependency_win32print @dependency_win32print
def open(self, job_name="python-escpos"): def open(
"""Open connection to default printer.""" self, job_name: str = "python-escpos", raise_not_found: bool = True
if self.printer_name is None: ) -> None:
raise Exception("Printer not found") """Open connection to default printer.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
self.job_name = job_name
try:
# Name validation, set default if no given name
self.printer_name = self.printer_name or win32print.GetDefaultPrinter()
assert self.printer_name in self.printers, "Incorrect printer name"
# Open device
self.device = win32print.OpenPrinter(self.printer_name) self.device = win32print.OpenPrinter(self.printer_name)
if self.device:
self.current_job = win32print.StartDocPrinter( self.current_job = win32print.StartDocPrinter(
self.device, 1, (job_name, None, "RAW") self.device, 1, (job_name, None, "RAW")
) )
win32print.StartPagePrinter(self.device) win32print.StartPagePrinter(self.device)
except (AssertionError, pywintypes.error) as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to start a print job for the printer {self.printer_name}:"
+ f"\n{e}"
)
else:
logging.error("Win32Raw printing %s not available", self.printer_name)
return
logging.info("Win32Raw printer enabled")
@dependency_win32print @dependency_win32print
def close(self): def close(self):
"""Close connection to default printer.""" """Close connection to default printer."""
if not self._device: if not self._device:
return return
logging.info("Closing Win32Raw connection to printer %s", self.printer_name)
win32print.EndPagePrinter(self.device) win32print.EndPagePrinter(self.device)
win32print.EndDocPrinter(self.device) win32print.EndDocPrinter(self.device)
win32print.ClosePrinter(self.device) win32print.ClosePrinter(self.device)