Separate method open() and constructor, enhance consistency between connectors. (#584)

* Add self-open mechanism
* self-open mechanism through the 'device' property
* Separate open() and store connection in 'device'
* Restore device status to False on close()
* Add default value to all params + type annotations
* Add generic DeviceNotFoundError exception
* Update USBNotFoundError return code
* Enhance connectors consistency
* Fix LP printer stall
* Fix LP waste of paper due to auto-flush + flush on close
* Move platform dependent printers' guard to init

---------

Co-authored-by: Patrick Kanzler <4189642+patkan@users.noreply.github.com>
This commit is contained in:
Benito López 2023-10-16 11:36:07 +02:00 committed by GitHub
parent 3a8af8a6f5
commit a00b98937b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 537 additions and 179 deletions

View File

@ -14,7 +14,7 @@ This module contains the abstract base class :py:class:`Escpos`.
import textwrap import textwrap
from abc import ABCMeta, abstractmethod # abstract base class support from abc import ABCMeta, abstractmethod # abstract base class support
from re import match as re_match from re import match as re_match
from typing import List, Optional, Union from typing import List, Literal, Optional, Union
import barcode import barcode
import qrcode import qrcode
@ -114,7 +114,11 @@ class Escpos(object):
class. class.
""" """
device = None # device status:
# False -> Not initialized
# None -> Initialized but not connected
# object -> The connection object (Usb(), Serial(), Network(), etc.)
_device: Union[Literal[False], Literal[None], object] = False
def __init__(self, profile=None, magic_encode_args=None, **kwargs) -> None: def __init__(self, profile=None, magic_encode_args=None, **kwargs) -> None:
"""Initialize ESCPOS Printer. """Initialize ESCPOS Printer.
@ -128,6 +132,27 @@ class Escpos(object):
"""Call self.close upon deletion.""" """Call self.close upon deletion."""
self.close() self.close()
@property
def device(self) -> Union[Literal[None], object]:
"""Implements a self-open mechanism.
An attempt to get the property before open the connection
will cause the connection to open.
"""
if self._device is False:
# Open device if not previously opened
self._device = None # None -> Initialized
self.open()
return self._device
@device.setter
def device(self, new_device: Union[Literal[False], Literal[None], object]):
self._device = new_device
def open(self):
"""Open a printer device/connection."""
pass
@abstractmethod @abstractmethod
def _raw(self, msg: bytes) -> None: def _raw(self, msg: bytes) -> None:
"""Send raw data to the printer. """Send raw data to the printer.

View File

@ -13,7 +13,8 @@ Result/Exit codes:
- `60` = Invalid pin to send Cash Drawer pulse :py:exc:`~escpos.exceptions.CashDrawerError` - `60` = Invalid pin to send Cash Drawer pulse :py:exc:`~escpos.exceptions.CashDrawerError`
- `70` = Invalid number of tab positions :py:exc:`~escpos.exceptions.TabPosError` - `70` = Invalid number of tab positions :py:exc:`~escpos.exceptions.TabPosError`
- `80` = Invalid char code :py:exc:`~escpos.exceptions.CharCodeError` - `80` = Invalid char code :py:exc:`~escpos.exceptions.CharCodeError`
- `90` = USB device not found :py:exc:`~escpos.exceptions.USBNotFoundError` - `90` = Device not found :py:exc:`~escpos.exceptions.DeviceNotFoundError`
- `91` = USB device not found :py:exc:`~escpos.exceptions.USBNotFoundError`
- `100` = Set variable out of range :py:exc:`~escpos.exceptions.SetVariableError` - `100` = Set variable out of range :py:exc:`~escpos.exceptions.SetVariableError`
- `200` = Configuration not found :py:exc:`~escpos.exceptions.ConfigNotFoundError` - `200` = Configuration not found :py:exc:`~escpos.exceptions.ConfigNotFoundError`
- `210` = Configuration syntax error :py:exc:`~escpos.exceptions.ConfigSyntaxError` - `210` = Configuration syntax error :py:exc:`~escpos.exceptions.ConfigSyntaxError`
@ -275,11 +276,35 @@ class CharCodeError(Error):
return "Valid char code must be set ({msg})".format(msg=self.msg) return "Valid char code must be set ({msg})".format(msg=self.msg)
class USBNotFoundError(Error): class DeviceNotFoundError(Error):
"""Device was not found (probably not plugged in). """Device was not found.
The device seems to be not accessible.
The return code for this exception is `90`.
inheritance:
.. inheritance-diagram:: escpos.exceptions.Error
:parts: 1
"""
def __init__(self, msg=""):
"""Initialize DeviceNotFoundError object."""
Error.__init__(self, msg)
self.msg = msg
self.resultcode = 90
def __str__(self):
"""Return string representation of DeviceNotFoundError."""
return f"Device not found ({self.msg})"
class USBNotFoundError(DeviceNotFoundError):
"""USB device was not found (probably not plugged in).
The USB device seems to be not plugged in. The USB device seems to be not plugged in.
The return code for this exception is `90`. The return code for this exception is `91`.
inheritance: inheritance:
@ -292,11 +317,11 @@ class USBNotFoundError(Error):
"""Initialize USBNotFoundError object.""" """Initialize USBNotFoundError object."""
Error.__init__(self, msg) Error.__init__(self, msg)
self.msg = msg self.msg = msg
self.resultcode = 90 self.resultcode = 91
def __str__(self): def __str__(self):
"""Return string representation of USBNotFoundError.""" """Return string representation of USBNotFoundError."""
return "USB device not found ({msg})".format(msg=self.msg) return f"USB device not found ({self.msg})"
class SetVariableError(Error): class SetVariableError(Error):

View File

@ -9,9 +9,12 @@
""" """
import functools import functools
import logging
import tempfile import tempfile
from typing import Literal, Optional, Type, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
#: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`) #: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`)
_DEP_PYCUPS = False _DEP_PYCUPS = False
@ -20,6 +23,9 @@ try:
import cups import cups
_DEP_PYCUPS = True _DEP_PYCUPS = True
# Store server defaults before further configuration
DEFAULT_HOST = cups.getServer()
DEFAULT_PORT = cups.getPort()
except ImportError: except ImportError:
pass pass
@ -78,49 +84,85 @@ class CupsPrinter(Escpos):
return is_usable() return is_usable()
@dependency_pycups @dependency_pycups
def __init__(self, printer_name=None, *args, **kwargs): def __init__(self, printer_name: str = "", *args, **kwargs):
"""Class constructor for CupsPrinter. """Class constructor for CupsPrinter.
:param printer_name: CUPS printer name (Optional) :param printer_name: CUPS printer name (Optional)
:type printer_name: str
:param host: CUPS server host/ip (Optional) :param host: CUPS server host/ip (Optional)
:type host: str :type host: str
:param port: CUPS server port (Optional) :param port: CUPS server port (Optional)
:type port: int :type port: int
""" """
Escpos.__init__(self, *args, **kwargs) Escpos.__init__(self, *args, **kwargs)
host, port = args or ( self.host, self.port = args or (
kwargs.get("host", cups.getServer()), kwargs.get("host", DEFAULT_HOST),
kwargs.get("port", cups.getPort()), kwargs.get("port", DEFAULT_PORT),
) )
cups.setServer(host) self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
cups.setPort(port)
self.conn = cups.Connection()
self.tmpfile = None
self.printer_name = printer_name self.printer_name = printer_name
self.job_name = "" self.job_name = ""
self.pending_job = False self.pending_job = False
self.open()
self._device: Union[
Literal[False], Literal[None], Type[cups.Connection]
] = False
@property @property
def printers(self): def printers(self) -> dict:
"""Available CUPS printers.""" """Available CUPS printers."""
return self.conn.getPrinters() if self.device:
return self.device.getPrinters()
return {}
def open(self, job_name="python-escpos"): def open(
self, job_name: str = "python-escpos", raise_not_found: bool = True
) -> None:
"""Set up a new print job and target the printer. """Set up a new print job and target the printer.
A call to this method is required to send new jobs to A call to this method is required to send new jobs to
the same CUPS connection. the CUPS connection after close.
Defaults to default CUPS printer. Defaults to default CUPS printer.
Creates a new temporary file buffer. Creates a new temporary file buffer.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
""" """
if self._device:
self.close()
cups.setServer(self.host)
cups.setPort(self.port)
self.job_name = job_name self.job_name = job_name
if self.printer_name not in self.printers: if self.tmpfile.closed:
self.printer_name = self.conn.getDefault()
self.tmpfile = tempfile.NamedTemporaryFile(delete=True) self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
try:
# Open device
self.device: Optional[Type[cups.Connection]] = cups.Connection()
if self.device:
# Name validation, set default if no given name
self.printer_name = self.printer_name or self.device.getDefault()
assert self.printer_name in self.printers, "Incorrect printer name"
except (RuntimeError, AssertionError) as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to start a print job for the printer {self.printer_name}:"
+ f"\n{e}"
)
else:
logging.error(
"CupsPrinter printing %s not available", self.printer_name
)
return
logging.info("CupsPrinter printer enabled")
def _raw(self, msg): def _raw(self, msg):
"""Append any command sent in raw format to temporary file. """Append any command sent in raw format to temporary file.
@ -134,14 +176,13 @@ class CupsPrinter(Escpos):
self.pending_job = False self.pending_job = False
raise ValueError("Printer job not opened") raise ValueError("Printer job not opened")
@dependency_pycups
def send(self): def send(self):
"""Send the print job to the printer.""" """Send the print job to the printer."""
if self.pending_job: if self.pending_job:
# Rewind tempfile # Rewind tempfile
self.tmpfile.seek(0) self.tmpfile.seek(0)
# Print temporary file via CUPS printer. # Print temporary file via CUPS printer.
self.conn.printFile( self.device.printFile(
self.printer_name, self.printer_name,
self.tmpfile.name, self.tmpfile.name,
self.job_name, self.job_name,
@ -173,8 +214,9 @@ class CupsPrinter(Escpos):
Send pending job to the printer if needed. Send pending job to the printer if needed.
""" """
if not self._device:
return
if self.pending_job: if self.pending_job:
self.send() self.send()
if self.conn: logging.info("Closing CUPS connection to printer %s", self.printer_name)
print("Closing CUPS connection to printer {}".format(self.printer_name)) self._device = False
self.conn = None

View File

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""This module contains the implementation of the CupsPrinter printer driver. """This module contains the implementation of the File printer driver.
:author: python-escpos developers :author: python-escpos developers
:organization: `python-escpos <https://github.com/python-escpos>`_ :organization: `python-escpos <https://github.com/python-escpos>`_
@ -8,7 +8,11 @@
:license: MIT :license: MIT
""" """
import logging
from typing import IO, Literal, Optional, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
def is_usable() -> bool: def is_usable() -> bool:
@ -39,7 +43,7 @@ class File(Escpos):
""" """
return is_usable() return is_usable()
def __init__(self, devfile="/dev/usb/lp0", auto_flush=True, *args, **kwargs): def __init__(self, devfile: str = "", auto_flush: bool = True, *args, **kwargs):
"""Initialize file printer with device file. """Initialize file printer with device file.
:param devfile: Device file under dev filesystem :param devfile: Device file under dev filesystem
@ -48,17 +52,40 @@ class File(Escpos):
Escpos.__init__(self, *args, **kwargs) Escpos.__init__(self, *args, **kwargs)
self.devfile = devfile self.devfile = devfile
self.auto_flush = auto_flush self.auto_flush = auto_flush
self.open()
def open(self): self._device: Union[Literal[False], Literal[None], IO[bytes]] = False
"""Open system file."""
self.device = open(self.devfile, "wb")
if self.device is None: def open(self, raise_not_found: bool = True) -> None:
print("Could not open the specified file {0}".format(self.devfile)) """Open system file.
def flush(self): By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
try:
# Open device
self.device: Optional[IO[bytes]] = open(self.devfile, "wb")
except OSError as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Could not open the specified file {self.devfile}:\n{e}"
)
else:
logging.error("File printer %s not found", self.devfile)
return
logging.info("File printer enabled")
def flush(self) -> None:
"""Flush printing content.""" """Flush printing content."""
if self.device:
self.device.flush() self.device.flush()
def _raw(self, msg): def _raw(self, msg):
@ -71,8 +98,12 @@ class File(Escpos):
if self.auto_flush: if self.auto_flush:
self.flush() self.flush()
def close(self): def close(self) -> None:
"""Close system file.""" """Close system file."""
if self.device is not None: if not self._device:
self.device.flush() return
self.device.close() logging.info("Closing File connection to printer %s", self.devfile)
if not self.auto_flush:
self.flush()
self._device.close()
self._device = False

View File

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""This module contains the implementation of the CupsPrinter printer driver. """This module contains the implementation of the LP printer driver.
:author: python-escpos developers :author: python-escpos developers
:organization: `python-escpos <https://github.com/python-escpos>`_ :organization: `python-escpos <https://github.com/python-escpos>`_
@ -9,11 +9,13 @@
""" """
import functools import functools
import os import logging
import subprocess import subprocess
import sys import sys
from typing import Literal, Optional, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
def is_usable() -> bool: def is_usable() -> bool:
@ -61,40 +63,124 @@ class LP(Escpos):
""" """
return is_usable() return is_usable()
def __init__(self, printer_name: str, *args, **kwargs): @dependency_linux_lp
def __init__(self, printer_name: str = "", *args, **kwargs):
"""LP class constructor. """LP class constructor.
:param printer_name: CUPS printer name (Optional) :param printer_name: CUPS printer name (Optional)
:type printer_name: str
:param auto_flush: Automatic flush after every _raw() (Optional) :param auto_flush: Automatic flush after every _raw() (Optional)
:type auto_flush: bool :type auto_flush: bool (Defaults False)
""" """
Escpos.__init__(self, *args, **kwargs) Escpos.__init__(self, *args, **kwargs)
self.printer_name = printer_name self.printer_name = printer_name
self.auto_flush = kwargs.get("auto_flush", True) self.auto_flush = kwargs.get("auto_flush", False)
self.open() self._flushed = False
@dependency_linux_lp self._device: Union[Literal[False], Literal[None], subprocess.Popen] = False
def open(self):
"""Invoke _lp_ in a new subprocess and wait for commands.""" @property
self.lp = subprocess.Popen( def printers(self) -> dict:
["lp", "-d", self.printer_name, "-o", "raw"], """Available CUPS printers."""
stdin=subprocess.PIPE, p_names = subprocess.run(
stdout=open(os.devnull, "w"), ["lpstat", "-e"], # Get printer names
capture_output=True,
text=True,
) )
p_devs = subprocess.run(
["lpstat", "-v"], # Get attached devices
capture_output=True,
text=True,
)
# List and trim output lines
names = [name for name in p_names.stdout.split("\n") if name]
devs = [dev for dev in p_devs.stdout.split("\n") if dev]
# return a dict of {printer name: attached device} pairs
return {name: dev.split()[-1] for name in names for dev in devs if name in dev}
def close(self): def _get_system_default_printer(self) -> str:
"""Return the system's default printer name."""
p_name = subprocess.run(
["lpstat", "-d"],
capture_output=True,
text=True,
)
name = p_name.stdout.split()[-1]
if name not in self.printers:
return ""
return name
def open(
self,
job_name: str = "python-escpos",
raise_not_found: bool = True,
_close_opened: bool = True,
) -> None:
"""Invoke _lp_ in a new subprocess and wait for commands.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device and _close_opened:
self.close()
self._is_closing = False
self.job_name = job_name
try:
# Name validation, set default if no given name
self.printer_name = self.printer_name or self._get_system_default_printer()
assert self.printer_name in self.printers, "Incorrect printer name"
# Open device
self.device: Optional[subprocess.Popen] = subprocess.Popen(
["lp", "-d", self.printer_name, "-t", self.job_name, "-o", "raw"],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except (AssertionError, subprocess.SubprocessError) as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to start a print job for the printer {self.printer_name}:"
+ f"\n{e}"
)
else:
logging.error("LP printing %s not available", self.printer_name)
return
logging.info("LP printer enabled")
def close(self) -> None:
"""Stop the subprocess.""" """Stop the subprocess."""
self.lp.terminate() if not self._device:
return
logging.info("Closing LP connection to printer %s", self.printer_name)
self._is_closing = True
if not self.auto_flush:
self.flush()
self._device.terminate()
self._device = False
def flush(self): def flush(self) -> None:
"""End line and wait for new commands.""" """End line and wait for new commands."""
if self.lp.stdin.writable(): if not self.device or not self.device.stdin:
self.lp.stdin.write(b"\n") return
if self.lp.stdin.closed is False:
self.lp.stdin.close() if self._flushed:
self.lp.wait() return
self.open()
if self.device.stdin.writable():
self.device.stdin.write(b"\n")
if self.device.stdin.closed is False:
self.device.stdin.close()
self.device.wait()
self._flushed = True
if not self._is_closing:
self.open(_close_opened=False)
def _raw(self, msg): def _raw(self, msg):
"""Write raw command(s) to the printer. """Write raw command(s) to the printer.
@ -102,9 +188,10 @@ class LP(Escpos):
:param msg: arbitrary code to be printed :param msg: arbitrary code to be printed
:type msg: bytes :type msg: bytes
""" """
if self.lp.stdin.writable(): if self.device.stdin.writable():
self.lp.stdin.write(msg) self.device.stdin.write(msg)
else: else:
raise Exception("Not a valid pipe for lp process") raise subprocess.SubprocessError("Not a valid pipe for lp process")
self._flushed = False
if self.auto_flush: if self.auto_flush:
self.flush() self.flush()

View File

@ -8,9 +8,12 @@
:license: MIT :license: MIT
""" """
import logging
import socket import socket
from typing import Literal, Optional, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
def is_usable() -> bool: def is_usable() -> bool:
@ -54,7 +57,14 @@ class Network(Escpos):
""" """
return is_usable() return is_usable()
def __init__(self, host, port=9100, timeout=60, *args, **kwargs): def __init__(
self,
host: str = "",
port: int = 9100,
timeout: Union[int, float] = 60,
*args,
**kwargs,
):
"""Initialize network printer. """Initialize network printer.
:param host: Printer's host name or IP address :param host: Printer's host name or IP address
@ -65,16 +75,40 @@ class Network(Escpos):
self.host = host self.host = host
self.port = port self.port = port
self.timeout = timeout self.timeout = timeout
self.open()
def open(self): self._device: Union[Literal[False], Literal[None], socket.socket] = False
"""Open TCP socket with ``socket``-library and set it as escpos device."""
self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def open(self, raise_not_found: bool = True) -> None:
"""Open TCP socket with ``socket``-library and set it as escpos device.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
try:
# Open device
self.device: Optional[socket.socket] = socket.socket(
socket.AF_INET, socket.SOCK_STREAM
)
self.device.settimeout(self.timeout) self.device.settimeout(self.timeout)
self.device.connect((self.host, self.port)) self.device.connect((self.host, self.port))
except OSError as e:
if self.device is None: # Raise exception or log error and cancel
print("Could not open socket for {0}".format(self.host)) self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Could not open socket for {self.host}:\n{e}"
)
else:
logging.error("Network device %s not found", self.host)
return
logging.info("Network printer enabled")
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -88,11 +122,14 @@ class Network(Escpos):
"""Read data from the TCP socket.""" """Read data from the TCP socket."""
return self.device.recv(16) return self.device.recv(16)
def close(self): def close(self) -> None:
"""Close TCP connection.""" """Close TCP connection."""
if self.device is not None: if not self._device:
return
logging.info("Closing Network connection to printer %s", self.host)
try: try:
self.device.shutdown(socket.SHUT_RDWR) self._device.shutdown(socket.SHUT_RDWR)
except socket.error: except socket.error:
pass pass
self.device.close() self._device.close()
self._device = False

View File

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""This module contains the implementation of the CupsPrinter printer driver. """This module contains the implementation of the Serial printer driver.
:author: python-escpos developers :author: python-escpos developers
:organization: `python-escpos <https://github.com/python-escpos>`_ :organization: `python-escpos <https://github.com/python-escpos>`_
@ -10,8 +10,11 @@
import functools import functools
import logging
from typing import Literal, Optional, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
#: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`) #: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`)
_DEP_PYSERIAL = False _DEP_PYSERIAL = False
@ -73,16 +76,16 @@ class Serial(Escpos):
@dependency_pyserial @dependency_pyserial
def __init__( def __init__(
self, self,
devfile="/dev/ttyS0", devfile: str = "",
baudrate=9600, baudrate: int = 9600,
bytesize=8, bytesize: int = 8,
timeout=1, timeout: Union[int, float] = 1,
parity=None, parity: Optional[str] = None,
stopbits=None, stopbits: Optional[int] = None,
xonxoff=False, xonxoff: bool = False,
dsrdtr=True, dsrdtr: bool = True,
*args, *args,
**kwargs **kwargs,
): ):
"""Initialize serial printer. """Initialize serial printer.
@ -111,14 +114,26 @@ class Serial(Escpos):
self.xonxoff = xonxoff self.xonxoff = xonxoff
self.dsrdtr = dsrdtr self.dsrdtr = dsrdtr
self.open() self._device: Union[Literal[False], Literal[None], serial.Serial] = False
@dependency_pyserial @dependency_pyserial
def open(self): def open(self, raise_not_found: bool = True) -> None:
"""Set up serial port and set is as escpos device.""" """Set up serial port and set is as escpos device.
if self.device is not None and self.device.is_open:
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
if self.device and self.device.is_open:
self.close() self.close()
self.device = serial.Serial(
try:
# Open device
self.device: Optional[serial.Serial] = serial.Serial(
port=self.devfile, port=self.devfile,
baudrate=self.baudrate, baudrate=self.baudrate,
bytesize=self.bytesize, bytesize=self.bytesize,
@ -128,11 +143,17 @@ class Serial(Escpos):
xonxoff=self.xonxoff, xonxoff=self.xonxoff,
dsrdtr=self.dsrdtr, dsrdtr=self.dsrdtr,
) )
except (ValueError, serial.SerialException) as e:
if self.device is not None: # Raise exception or log error and cancel
print("Serial printer enabled") self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to open serial printer on {self.devfile}:\n{e}"
)
else: else:
print("Unable to open serial printer on: {0}".format(str(self.devfile))) logging.error("Serial device %s not found", self.devfile)
return
logging.info("Serial printer enabled")
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -146,8 +167,12 @@ class Serial(Escpos):
"""Read the data buffer and return it to the caller.""" """Read the data buffer and return it to the caller."""
return self.device.read(16) return self.device.read(16)
def close(self): def close(self) -> None:
"""Close Serial interface.""" """Close Serial interface."""
if self.device is not None and self.device.is_open: if not self._device:
self.device.flush() return
self.device.close() logging.info("Closing Serial connection to printer %s", self.devfile)
if self._device and self._device.is_open:
self._device.flush()
self._device.close()
self._device = False

View File

@ -8,9 +8,11 @@
:license: MIT :license: MIT
""" """
import functools import functools
import logging
from typing import Dict, Literal, Optional, Type, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import USBNotFoundError from ..exceptions import DeviceNotFoundError, USBNotFoundError
#: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`) #: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`)
_DEP_USB = False _DEP_USB = False
@ -72,14 +74,14 @@ class Usb(Escpos):
def __init__( def __init__(
self, self,
idVendor, idVendor: str = "",
idProduct, idProduct: str = "",
usb_args=None, usb_args: Dict[str, str] = {},
timeout=0, timeout: Union[int, float] = 0,
in_ep=0x82, in_ep: int = 0x82,
out_ep=0x01, out_ep: int = 0x01,
*args, *args,
**kwargs **kwargs,
): ):
"""Initialize USB printer. """Initialize USB printer.
@ -95,32 +97,65 @@ class Usb(Escpos):
self.in_ep = in_ep self.in_ep = in_ep
self.out_ep = out_ep self.out_ep = out_ep
usb_args = usb_args or {} self.usb_args = usb_args or {}
if idVendor: if idVendor:
usb_args["idVendor"] = idVendor self.usb_args["idVendor"] = idVendor
if idProduct: if idProduct:
usb_args["idProduct"] = idProduct self.usb_args["idProduct"] = idProduct
self.open(usb_args)
self._device: Union[
Literal[False], Literal[None], Type[usb.core.Device]
] = False
@dependency_usb @dependency_usb
def open(self, usb_args): def open(self, raise_not_found: bool = True) -> None:
"""Search device on USB tree and set it as escpos device. """Search device on USB tree and set it as escpos device.
:param usb_args: USB arguments By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
:raises: :py:exc:`~escpos.exceptions.USBNotFoundError`
""" """
self.device = usb.core.find(**usb_args) if self._device:
if self.device is None: self.close()
raise USBNotFoundError("Device not found or cable not plugged in.")
self.idVendor = self.device.idVendor # Open device
self.idProduct = self.device.idProduct try:
self.device: Optional[Type[usb.core.Device]] = usb.core.find(
**self.usb_args
)
assert self.device, USBNotFoundError(
f"Device {tuple(self.usb_args.values())} not found"
+ " or cable not plugged in."
)
self._check_driver()
self._configure_usb()
except (AssertionError, usb.core.USBError) as e:
# Raise exception or log error and cancel
self.device = None
if raise_not_found:
raise DeviceNotFoundError(
f"Unable to open USB printer on {tuple(self.usb_args.values())}:"
+ f"\n{e}"
)
else:
logging.error("USB device %s not found", tuple(self.usb_args.values()))
return
logging.info("USB printer enabled")
# pyusb has three backends: libusb0, libusb1 and openusb but def _check_driver(self) -> None:
# only libusb1 backend implements the methods is_kernel_driver_active() """Check the driver.
# and detach_kernel_driver().
# This helps enable this library to work on Windows. pyusb has three backends: libusb0, libusb1 and openusb but
if self.device.backend.__module__.endswith("libusb1"): only libusb1 backend implements the methods is_kernel_driver_active()
check_driver = None and detach_kernel_driver().
This helps enable this library to work on Windows.
"""
if self.device and self.device.backend.__module__.endswith("libusb1"):
check_driver: Optional[bool] = None
try: try:
check_driver = self.device.is_kernel_driver_active(0) check_driver = self.device.is_kernel_driver_active(0)
@ -134,13 +169,17 @@ class Usb(Escpos):
pass pass
except usb.core.USBError as e: except usb.core.USBError as e:
if check_driver is not None: if check_driver is not None:
print("Could not detatch kernel driver: {0}".format(str(e))) logging.error("Could not detatch kernel driver: %s", str(e))
def _configure_usb(self) -> None:
"""Configure USB."""
if not self.device:
return
try: try:
self.device.set_configuration() self.device.set_configuration()
self.device.reset() self.device.reset()
except usb.core.USBError as e: except usb.core.USBError as e:
print("Could not set configuration: {0}".format(str(e))) logging.error("Could not set configuration: %s", str(e))
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -155,8 +194,12 @@ class Usb(Escpos):
return self.device.read(self.in_ep, 16) return self.device.read(self.in_ep, 16)
@dependency_usb @dependency_usb
def close(self): def close(self) -> None:
"""Release USB interface.""" """Release USB interface."""
if self.device: if not self._device:
usb.util.dispose_resources(self.device) return
self.device = None logging.info(
"Closing Usb connection to printer %s", tuple(self.usb_args.values())
)
usb.util.dispose_resources(self._device)
self._device = False

View File

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""This module contains the implementation of the CupsPrinter printer driver. """This module contains the implementation of the Win32Raw printer driver.
:author: python-escpos developers :author: python-escpos developers
:organization: `python-escpos <https://github.com/python-escpos>`_ :organization: `python-escpos <https://github.com/python-escpos>`_
@ -9,12 +9,16 @@
""" """
import functools import functools
import logging
from typing import Literal, Optional, Type, Union
from ..escpos import Escpos from ..escpos import Escpos
from ..exceptions import DeviceNotFoundError
#: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`) #: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`)
_DEP_WIN32PRINT = False _DEP_WIN32PRINT = False
try: try:
import win32print import win32print
@ -70,38 +74,78 @@ class Win32Raw(Escpos):
return is_usable() return is_usable()
@dependency_win32print @dependency_win32print
def __init__(self, printer_name=None, *args, **kwargs): def __init__(self, printer_name: str = "", *args, **kwargs):
"""Initialize default printer.""" """Initialize default printer."""
Escpos.__init__(self, *args, **kwargs) Escpos.__init__(self, *args, **kwargs)
if printer_name is not None:
self.printer_name = printer_name self.printer_name = printer_name
else: self.job_name = ""
self.printer_name = win32print.GetDefaultPrinter()
self.hPrinter = None
self.open()
@dependency_win32print self._device: Union[
def open(self, job_name="python-escpos"): Literal[False],
"""Open connection to default printer.""" Literal[None],
if self.printer_name is None: Type[win32print.OpenPrinter],
raise Exception("Printer not found") ] = False
self.hPrinter = win32print.OpenPrinter(self.printer_name)
@property
def printers(self) -> dict:
"""Available Windows printers."""
return {
printer["pPrinterName"]: printer
for printer in win32print.EnumPrinters(win32print.PRINTER_ENUM_NAME, "", 4)
}
def open(
self, job_name: str = "python-escpos", raise_not_found: bool = True
) -> None:
"""Open connection to default printer.
By default raise an exception if device is not found.
:param raise_not_found: Default True.
False to log error but do not raise exception.
:raises: :py:exc:`~escpos.exceptions.DeviceNotFoundError`
"""
if self._device:
self.close()
self.job_name = job_name
try:
# Name validation, set default if no given name
self.printer_name = self.printer_name or win32print.GetDefaultPrinter()
assert self.printer_name in self.printers, "Incorrect printer name"
# Open device
self.device: Optional[
Type[win32print.OpenPrinter]
] = win32print.OpenPrinter(self.printer_name)
if self.device:
self.current_job = win32print.StartDocPrinter( self.current_job = win32print.StartDocPrinter(
self.hPrinter, 1, (job_name, None, "RAW") self.device, 1, (job_name, None, "RAW")
) )
win32print.StartPagePrinter(self.hPrinter) win32print.StartPagePrinter(self.device)
except AssertionError as e:
@dependency_win32print # Raise exception or log error and cancel
def close(self): self.device = None
"""Close connection to default printer.""" if raise_not_found:
if not self.hPrinter: raise DeviceNotFoundError(
f"Unable to start a print job for the printer {self.printer_name}:"
+ f"\n{e}"
)
else:
logging.error("Win32Raw printing %s not available", self.printer_name)
return return
win32print.EndPagePrinter(self.hPrinter) logging.info("Win32Raw printer enabled")
win32print.EndDocPrinter(self.hPrinter)
win32print.ClosePrinter(self.hPrinter) def close(self) -> None:
self.hPrinter = None """Close connection to default printer."""
if not self._device:
return
logging.info("Closing Win32Raw connection to printer %s", self.printer_name)
win32print.EndPagePrinter(self._device)
win32print.EndDocPrinter(self._device)
win32print.ClosePrinter(self._device)
self._device = False
@dependency_win32print
def _raw(self, msg): def _raw(self, msg):
"""Print any command sent in raw format. """Print any command sent in raw format.
@ -110,6 +154,6 @@ class Win32Raw(Escpos):
""" """
if self.printer_name is None: if self.printer_name is None:
raise Exception("Printer not found") raise Exception("Printer not found")
if self.hPrinter is None: if not self.device:
raise Exception("Printer job not opened") raise Exception("Printer job not opened")
win32print.WritePrinter(self.hPrinter, msg) win32print.WritePrinter(self.device, msg)

View File

@ -57,7 +57,6 @@ deps = mypy
types-appdirs types-appdirs
types-Pillow types-Pillow
types-pyserial types-pyserial
types-pywin32
hypothesis>=6.83 hypothesis>=6.83
jaconv jaconv
commands = mypy src test commands = mypy src test