split off dependencies for optional installation (#546)
* add inheritance diagrams to all printers and exceptions * split off printer implementations into separate files * add wrapper that thros RuntimeError if not importable * add dependency check for lp * add dependency check for pyserial * added check for usability * import Win32Raw * include WIn32Raw in documentation * enable all extras on tox * update github workflow
This commit is contained in:
parent
fbabd8ed88
commit
3177c8d411
|
@ -27,6 +27,7 @@ jobs:
|
|||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
sudo apt-get install -y graphviz libenchant-2-2 gcc libcups2-dev python3-dev xindy
|
||||
python -m pip install --upgrade pip
|
||||
pip install flake8 pytest tox tox-gh-actions
|
||||
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
|
||||
|
|
2
INSTALL
2
INSTALL
|
@ -4,7 +4,7 @@ python-escpos
|
|||
This library is available over pypi. So for most of the use-cases it should be sufficient to run
|
||||
|
||||
```
|
||||
pip install python-escpos --user # add --pre if you want to install pre-releases
|
||||
pip install python-escpos[all] --user # add --pre if you want to install pre-releases
|
||||
```
|
||||
|
||||
For more information please read the documentation at https://python-escpos.readthedocs.io/en/latest/user/installation.html
|
||||
|
|
|
@ -12,7 +12,7 @@ so you can simply enter:
|
|||
|
||||
::
|
||||
|
||||
pip install python-escpos
|
||||
pip install python-escpos[all]
|
||||
|
||||
This should install all necessary dependencies. Apart from that
|
||||
python-escpos is for some versions also available as a Debian package.
|
||||
|
@ -21,6 +21,14 @@ always install from PyPi.
|
|||
If you use the ``--pre`` parameter for ``pip``, you will get the latest
|
||||
pre-release.
|
||||
|
||||
The following installation options exist:
|
||||
|
||||
* `all`: install all packages available for this platform
|
||||
* `usb`: install packages required for USB printers
|
||||
* `serial`: install packages required for serial printers
|
||||
* `win32`: install packages required for win32 printing (only Windows)
|
||||
* `cups`: install packages required for CUPS printing
|
||||
|
||||
Setup udev for USB-Printers
|
||||
---------------------------
|
||||
1. Get the *Product ID* and *Vendor ID* from the lsusb command
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
Printers
|
||||
========
|
||||
|
||||
:Last Reviewed: 2023-08-10
|
||||
:Last Reviewed: 2023-08-23
|
||||
|
||||
As of now there are 8 different types of printer implementations.
|
||||
|
||||
|
@ -116,4 +116,8 @@ This driver uses a native WIN32 interface of Windows in order to print.
|
|||
Please refer to the code for documentation as this driver is currently
|
||||
not included in the documentation build.
|
||||
|
||||
.. todo:: Include Win32Raw in documentation build
|
||||
.. autoclass:: escpos.printer.Win32Raw
|
||||
:members:
|
||||
:special-members:
|
||||
:member-order: bysource
|
||||
:noindex:
|
17
setup.cfg
17
setup.cfg
|
@ -35,10 +35,8 @@ python_requires = >=3.8
|
|||
zip_safe = false
|
||||
include_package_data = true
|
||||
install_requires =
|
||||
pyusb>=1.0.0
|
||||
Pillow>=2.0
|
||||
qrcode>=4.0
|
||||
pyserial
|
||||
python-barcode>=0.9.1,<1
|
||||
setuptools
|
||||
six
|
||||
|
@ -60,6 +58,21 @@ tests_require =
|
|||
flake8
|
||||
sphinxcontrib-spelling>=7.2.0
|
||||
|
||||
[options.extras_require]
|
||||
usb =
|
||||
pyusb>=1.0.0
|
||||
serial =
|
||||
pyserial
|
||||
cups =
|
||||
pycups
|
||||
win32 =
|
||||
pywin32; platform_system=='Windows'
|
||||
all =
|
||||
pyusb>=1.0.0
|
||||
pyserial
|
||||
pycups
|
||||
pywin32; platform_system=='Windows'
|
||||
|
||||
[flake8]
|
||||
exclude = .git,.tox,.github,.eggs,__pycache__,doc/conf.py,build,dist,capabilities-data,test,src/escpos/constants.py
|
||||
max-line-length = 120
|
||||
|
|
|
@ -5,7 +5,7 @@ This module contains constants that are described in the esc/pos-documentation.
|
|||
Since there is no definitive and unified specification for all esc/pos-like printers the constants could later be
|
||||
moved to `capabilities` as in `escpos-php by @mike42 <https://github.com/mike42/escpos-php>`_.
|
||||
|
||||
:author: `Manuel F Martinez <manpaz@bashlinux.com>`_ and others
|
||||
:author: python-escpos developers
|
||||
:organization: Bashlinux and `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2017 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
This module contains the abstract base class :py:class:`Escpos`.
|
||||
|
||||
:author: `Manuel F Martinez <manpaz@bashlinux.com>`_ and others
|
||||
:author: python-escpos developers
|
||||
:organization: Bashlinux and `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2017 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
|
|
|
@ -19,7 +19,7 @@ Result/Exit codes:
|
|||
- `210` = Configuration syntax error :py:exc:`~escpos.exceptions.ConfigSyntaxError`
|
||||
- `220` = Configuration section not found :py:exc:`~escpos.exceptions.ConfigSectionMissingError`
|
||||
|
||||
:author: `Manuel F Martinez <manpaz@bashlinux.com>`_ and others
|
||||
:author: python-escpos developers
|
||||
:organization: Bashlinux and `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2017 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
|
@ -27,7 +27,14 @@ Result/Exit codes:
|
|||
|
||||
|
||||
class Error(Exception):
|
||||
"""Base class for ESC/POS errors."""
|
||||
"""Base class for ESC/POS errors.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.Error
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg, status=None):
|
||||
"""Initialize Error object."""
|
||||
|
@ -48,6 +55,12 @@ class BarcodeTypeError(Error):
|
|||
This exception indicates that no known barcode-type has been entered. The barcode-type has to be
|
||||
one of those specified in :py:meth:`escpos.escpos.Escpos.barcode`.
|
||||
The returned error code is `10`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.BarcodeTypeError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -67,6 +80,12 @@ class BarcodeSizeError(Error):
|
|||
This exception indicates that the values for the barcode size are out of range.
|
||||
The size of the barcode has to be in the range that is specified in :py:meth:`escpos.escpos.Escpos.barcode`.
|
||||
The resulting returncode is `20`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.BarcodeSizeError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -86,6 +105,12 @@ class BarcodeCodeError(Error):
|
|||
No data for the barcode has been supplied in :py:meth:`escpos.escpos.Escpos.barcode` or the the `check` parameter
|
||||
was True and the check failed.
|
||||
The returncode for this exception is `30`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.BarcodeCodeError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -103,6 +128,12 @@ class ImageSizeError(Error):
|
|||
"""Image height is longer than 255px and can't be printed.
|
||||
|
||||
The returncode for this exception is `40`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.ImageSizeError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -122,6 +153,12 @@ class ImageWidthError(Error):
|
|||
"""Image width is too large.
|
||||
|
||||
The return code for this exception is `41`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.ImageWidthError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -140,6 +177,12 @@ class TextError(Error):
|
|||
|
||||
This exception is raised when an empty string is passed to :py:meth:`escpos.escpos.Escpos.text`.
|
||||
The returncode for this exception is `50`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.TextError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -160,6 +203,12 @@ class CashDrawerError(Error):
|
|||
|
||||
A valid pin number has to be passed onto the method :py:meth:`escpos.escpos.Escpos.cashdraw`.
|
||||
The returncode for this exception is `60`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.CashDrawerError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -181,6 +230,12 @@ class TabPosError(Error):
|
|||
|
||||
This exception is raised by :py:meth:`escpos.escpos.Escpos.control`.
|
||||
The returncode for this exception is `70`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.TabPosError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -201,6 +256,12 @@ class CharCodeError(Error):
|
|||
|
||||
The supplied charcode-name in :py:meth:`escpos.escpos.Escpos.charcode` is unknown.
|
||||
Ths returncode for this exception is `80`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.CharCodeError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -219,6 +280,12 @@ class USBNotFoundError(Error):
|
|||
|
||||
The USB device seems to be not plugged in.
|
||||
Ths returncode for this exception is `90`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.USBNotFoundError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -237,6 +304,12 @@ class SetVariableError(Error):
|
|||
|
||||
Check set variables against minimum and maximum values
|
||||
Ths returncode for this exception is `100`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.SetVariableError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -258,6 +331,12 @@ class ConfigNotFoundError(Error):
|
|||
|
||||
The default or passed configuration file could not be read
|
||||
Ths returncode for this exception is `200`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.ConfigNotFoundError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -276,6 +355,12 @@ class ConfigSyntaxError(Error):
|
|||
|
||||
The syntax is incorrect
|
||||
Ths returncode for this exception is `210`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.ConfigSyntaxError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
@ -294,6 +379,12 @@ class ConfigSectionMissingError(Error):
|
|||
|
||||
The part of the config asked for doesn't exist in the loaded configuration
|
||||
Ths returncode for this exception is `220`.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.exceptions.ConfigSectionMissingError
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, msg=""):
|
||||
|
|
|
@ -1,614 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementations of abstract base class :py:class:`Escpos`.
|
||||
|
||||
:author: `Manuel F Martinez <manpaz@bashlinux.com>`_ and others
|
||||
:organization: Bashlinux and `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2017 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import serial
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
from .escpos import Escpos
|
||||
from .exceptions import USBNotFoundError
|
||||
|
||||
_WIN32PRINT = False
|
||||
try:
|
||||
import win32print
|
||||
|
||||
_WIN32PRINT = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
_CUPSPRINT = False
|
||||
try:
|
||||
import tempfile
|
||||
|
||||
import cups
|
||||
|
||||
_CUPSPRINT = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
class Usb(Escpos):
|
||||
"""USB printer.
|
||||
|
||||
This class describes a printer that natively speaks USB.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Usb
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
idVendor,
|
||||
idProduct,
|
||||
usb_args=None,
|
||||
timeout=0,
|
||||
in_ep=0x82,
|
||||
out_ep=0x01,
|
||||
*args,
|
||||
**kwargs
|
||||
): # noqa: N803
|
||||
"""Initialize USB printer.
|
||||
|
||||
:param idVendor: Vendor ID
|
||||
:param idProduct: Product ID
|
||||
:param usb_args: Optional USB arguments (e.g. custom_match)
|
||||
:param timeout: Is the time limit of the USB operation. Default without timeout.
|
||||
:param in_ep: Input end point
|
||||
:param out_ep: Output end point
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.timeout = timeout
|
||||
self.in_ep = in_ep
|
||||
self.out_ep = out_ep
|
||||
|
||||
usb_args = usb_args or {}
|
||||
if idVendor:
|
||||
usb_args["idVendor"] = idVendor
|
||||
if idProduct:
|
||||
usb_args["idProduct"] = idProduct
|
||||
self.open(usb_args)
|
||||
|
||||
def open(self, usb_args):
|
||||
"""Search device on USB tree and set it as escpos device.
|
||||
|
||||
:param usb_args: USB arguments
|
||||
"""
|
||||
self.device = usb.core.find(**usb_args)
|
||||
if self.device is None:
|
||||
raise USBNotFoundError("Device not found or cable not plugged in.")
|
||||
|
||||
self.idVendor = self.device.idVendor
|
||||
self.idProduct = self.device.idProduct
|
||||
|
||||
# pyusb has three backends: libusb0, libusb1 and openusb but
|
||||
# only libusb1 backend implements the methods is_kernel_driver_active()
|
||||
# and detach_kernel_driver().
|
||||
# This helps enable this library to work on Windows.
|
||||
if self.device.backend.__module__.endswith("libusb1"):
|
||||
check_driver = None
|
||||
|
||||
try:
|
||||
check_driver = self.device.is_kernel_driver_active(0)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
if check_driver is None or check_driver:
|
||||
try:
|
||||
self.device.detach_kernel_driver(0)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
except usb.core.USBError as e:
|
||||
if check_driver is not None:
|
||||
print("Could not detatch kernel driver: {0}".format(str(e)))
|
||||
|
||||
try:
|
||||
self.device.set_configuration()
|
||||
self.device.reset()
|
||||
except usb.core.USBError as e:
|
||||
print("Could not set configuration: {0}".format(str(e)))
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.write(self.out_ep, msg, self.timeout)
|
||||
|
||||
def _read(self):
|
||||
"""Read a data buffer and return it to the caller."""
|
||||
return self.device.read(self.in_ep, 16)
|
||||
|
||||
def close(self):
|
||||
"""Release USB interface."""
|
||||
if self.device:
|
||||
usb.util.dispose_resources(self.device)
|
||||
self.device = None
|
||||
|
||||
|
||||
class Serial(Escpos):
|
||||
"""Serial printer.
|
||||
|
||||
This class describes a printer that is connected by serial interface.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Serial
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
devfile="/dev/ttyS0",
|
||||
baudrate=9600,
|
||||
bytesize=8,
|
||||
timeout=1,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
xonxoff=False,
|
||||
dsrdtr=True,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""Initialize serial printer.
|
||||
|
||||
:param devfile: Device file under dev filesystem
|
||||
:param baudrate: Baud rate for serial transmission
|
||||
:param bytesize: Serial buffer size
|
||||
:param timeout: Read/Write timeout
|
||||
:param parity: Parity checking
|
||||
:param stopbits: Number of stop bits
|
||||
:param xonxoff: Software flow control
|
||||
:param dsrdtr: Hardware flow control (False to enable RTS/CTS)
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.devfile = devfile
|
||||
self.baudrate = baudrate
|
||||
self.bytesize = bytesize
|
||||
self.timeout = timeout
|
||||
self.parity = parity
|
||||
self.stopbits = stopbits
|
||||
self.xonxoff = xonxoff
|
||||
self.dsrdtr = dsrdtr
|
||||
|
||||
self.open()
|
||||
|
||||
def open(self):
|
||||
"""Set up serial port and set is as escpos device."""
|
||||
if self.device is not None and self.device.is_open:
|
||||
self.close()
|
||||
self.device = serial.Serial(
|
||||
port=self.devfile,
|
||||
baudrate=self.baudrate,
|
||||
bytesize=self.bytesize,
|
||||
parity=self.parity,
|
||||
stopbits=self.stopbits,
|
||||
timeout=self.timeout,
|
||||
xonxoff=self.xonxoff,
|
||||
dsrdtr=self.dsrdtr,
|
||||
)
|
||||
|
||||
if self.device is not None:
|
||||
print("Serial printer enabled")
|
||||
else:
|
||||
print("Unable to open serial printer on: {0}".format(str(self.devfile)))
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.write(msg)
|
||||
|
||||
def _read(self):
|
||||
"""Read the data buffer and return it to the caller."""
|
||||
return self.device.read(16)
|
||||
|
||||
def close(self):
|
||||
"""Close Serial interface."""
|
||||
if self.device is not None and self.device.is_open:
|
||||
self.device.flush()
|
||||
self.device.close()
|
||||
|
||||
|
||||
class Network(Escpos):
|
||||
"""Network printer.
|
||||
|
||||
This class is used to attach to a networked printer. You can also use this in order to attach to a printer that
|
||||
is forwarded with ``socat``.
|
||||
|
||||
If you have a local printer on parallel port ``/dev/usb/lp0`` then you could start ``socat`` with:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
socat -u TCP4-LISTEN:4242,reuseaddr,fork OPEN:/dev/usb/lp0
|
||||
|
||||
Then you should be able to attach to port ``4242`` with this class.
|
||||
Otherwise the normal usecase would be to have a printer with ethernet interface. This type of printer should
|
||||
work the same with this class. For the address of the printer check its manuals.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Network
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, host, port=9100, timeout=60, *args, **kwargs):
|
||||
"""Initialize network printer.
|
||||
|
||||
:param host: Printer's hostname or IP address
|
||||
:param port: Port to write to
|
||||
:param timeout: timeout in seconds for the socket-library
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self.open()
|
||||
|
||||
def open(self):
|
||||
"""Open TCP socket with ``socket``-library and set it as escpos device."""
|
||||
self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.device.settimeout(self.timeout)
|
||||
self.device.connect((self.host, self.port))
|
||||
|
||||
if self.device is None:
|
||||
print("Could not open socket for {0}".format(self.host))
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.sendall(msg)
|
||||
|
||||
def _read(self):
|
||||
"""Read data from the TCP socket."""
|
||||
return self.device.recv(16)
|
||||
|
||||
def close(self):
|
||||
"""Close TCP connection."""
|
||||
if self.device is not None:
|
||||
try:
|
||||
self.device.shutdown(socket.SHUT_RDWR)
|
||||
except socket.error:
|
||||
pass
|
||||
self.device.close()
|
||||
|
||||
|
||||
class File(Escpos):
|
||||
"""Generic file printer.
|
||||
|
||||
This class is used for parallel port printer or other printers that are directly attached to the filesystem.
|
||||
Note that you should stay away from using USB-to-Parallel-Adapter since they are unreliable
|
||||
and produce arbitrary errors.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.File
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, devfile="/dev/usb/lp0", auto_flush=True, *args, **kwargs):
|
||||
"""Initialize file printer with device file.
|
||||
|
||||
:param devfile: Device file under dev filesystem
|
||||
:param auto_flush: automatically call flush after every call of _raw()
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.devfile = devfile
|
||||
self.auto_flush = auto_flush
|
||||
self.open()
|
||||
|
||||
def open(self):
|
||||
"""Open system file."""
|
||||
self.device = open(self.devfile, "wb")
|
||||
|
||||
if self.device is None:
|
||||
print("Could not open the specified file {0}".format(self.devfile))
|
||||
|
||||
def flush(self):
|
||||
"""Flush printing content."""
|
||||
self.device.flush()
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.write(msg)
|
||||
if self.auto_flush:
|
||||
self.flush()
|
||||
|
||||
def close(self):
|
||||
"""Close system file."""
|
||||
if self.device is not None:
|
||||
self.device.flush()
|
||||
self.device.close()
|
||||
|
||||
|
||||
class Dummy(Escpos):
|
||||
"""Dummy printer.
|
||||
|
||||
This class is used for saving commands to a variable, for use in situations where
|
||||
there is no need to send commands to an actual printer. This includes
|
||||
generating print jobs for later use, or testing output.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Dummy
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Init with empty output list."""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self._output_list = []
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self._output_list.append(msg)
|
||||
|
||||
@property
|
||||
def output(self):
|
||||
"""Get the data that was sent to this printer."""
|
||||
return b"".join(self._output_list)
|
||||
|
||||
def clear(self):
|
||||
"""Clear the buffer of the printer.
|
||||
|
||||
This method can be called if you send the contents to a physical printer
|
||||
and want to use the Dummy printer for new output.
|
||||
"""
|
||||
del self._output_list[:]
|
||||
|
||||
def close(self):
|
||||
"""Close not implemented for Dummy printer."""
|
||||
pass
|
||||
|
||||
|
||||
if _WIN32PRINT:
|
||||
|
||||
class Win32Raw(Escpos):
|
||||
"""Printer binding for win32 API.
|
||||
|
||||
Uses the module pywin32 for printing.
|
||||
"""
|
||||
|
||||
def __init__(self, printer_name=None, *args, **kwargs):
|
||||
"""Initialize default printer."""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
if printer_name is not None:
|
||||
self.printer_name = printer_name
|
||||
else:
|
||||
self.printer_name = win32print.GetDefaultPrinter()
|
||||
self.hPrinter = None
|
||||
self.open()
|
||||
|
||||
def open(self, job_name="python-escpos"):
|
||||
"""Open connection to default printer."""
|
||||
if self.printer_name is None:
|
||||
raise Exception("Printer not found")
|
||||
self.hPrinter = win32print.OpenPrinter(self.printer_name)
|
||||
self.current_job = win32print.StartDocPrinter(
|
||||
self.hPrinter, 1, (job_name, None, "RAW")
|
||||
)
|
||||
win32print.StartPagePrinter(self.hPrinter)
|
||||
|
||||
def close(self):
|
||||
"""Close connection to default printer."""
|
||||
if not self.hPrinter:
|
||||
return
|
||||
win32print.EndPagePrinter(self.hPrinter)
|
||||
win32print.EndDocPrinter(self.hPrinter)
|
||||
win32print.ClosePrinter(self.hPrinter)
|
||||
self.hPrinter = None
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
if self.printer_name is None:
|
||||
raise Exception("Printer not found")
|
||||
if self.hPrinter is None:
|
||||
raise Exception("Printer job not opened")
|
||||
win32print.WritePrinter(self.hPrinter, msg)
|
||||
|
||||
|
||||
if _CUPSPRINT:
|
||||
|
||||
class CupsPrinter(Escpos):
|
||||
"""Simple CUPS printer connector.
|
||||
|
||||
.. note::
|
||||
|
||||
Requires ``pycups`` which in turn needs the cups development library package:
|
||||
- Ubuntu/Debian: ``libcups2-dev``
|
||||
- OpenSuse/Fedora: ``cups-devel``
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, printer_name=None, *args, **kwargs):
|
||||
"""Class constructor for CupsPrinter.
|
||||
|
||||
:param printer_name: CUPS printer name (Optional)
|
||||
:type printer_name: str
|
||||
:param host: CUPS server host/ip (Optional)
|
||||
:type host: str
|
||||
:param port: CUPS server port (Optional)
|
||||
:type port: int
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
host, port = args or (
|
||||
kwargs.get("host", cups.getServer()),
|
||||
kwargs.get("port", cups.getPort()),
|
||||
)
|
||||
cups.setServer(host)
|
||||
cups.setPort(port)
|
||||
self.conn = cups.Connection()
|
||||
self.tmpfile = None
|
||||
self.printer_name = printer_name
|
||||
self.job_name = ""
|
||||
self.pending_job = False
|
||||
self.open()
|
||||
|
||||
@property
|
||||
def printers(self):
|
||||
"""Available CUPS printers."""
|
||||
return self.conn.getPrinters()
|
||||
|
||||
def open(self, job_name="python-escpos"):
|
||||
"""Set up a new print job and target the printer.
|
||||
|
||||
A call to this method is required to send new jobs to
|
||||
the same CUPS connection.
|
||||
|
||||
Defaults to default CUPS printer.
|
||||
Creates a new temporary file buffer.
|
||||
"""
|
||||
self.job_name = job_name
|
||||
if self.printer_name not in self.printers:
|
||||
self.printer_name = self.conn.getDefault()
|
||||
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Append any command sent in raw format to temporary file.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.pending_job = True
|
||||
try:
|
||||
self.tmpfile.write(msg)
|
||||
except ValueError:
|
||||
self.pending_job = False
|
||||
raise ValueError("Printer job not opened")
|
||||
|
||||
def send(self):
|
||||
"""Send the print job to the printer."""
|
||||
if self.pending_job:
|
||||
# Rewind tempfile
|
||||
self.tmpfile.seek(0)
|
||||
# Print temporary file via CUPS printer.
|
||||
self.conn.printFile(
|
||||
self.printer_name,
|
||||
self.tmpfile.name,
|
||||
self.job_name,
|
||||
{"document-format": cups.CUPS_FORMAT_RAW},
|
||||
)
|
||||
self._clear()
|
||||
|
||||
def _clear(self):
|
||||
"""Finish the print job.
|
||||
|
||||
Remove temporary file.
|
||||
"""
|
||||
self.tmpfile.close()
|
||||
self.pending_job = False
|
||||
|
||||
def _read(self):
|
||||
"""Return a single-item array with the accepting state of the print queue.
|
||||
|
||||
states: idle = [3], printing a job = [4], stopped = [5]
|
||||
"""
|
||||
printer = self.printers.get(self.printer_name, {})
|
||||
state = printer.get("printer-state")
|
||||
if not state:
|
||||
return []
|
||||
return [state]
|
||||
|
||||
def close(self):
|
||||
"""Close CUPS connection.
|
||||
|
||||
Send pending job to the printer if needed.
|
||||
"""
|
||||
if self.pending_job:
|
||||
self.send()
|
||||
if self.conn:
|
||||
print("Closing CUPS connection to printer {}".format(self.printer_name))
|
||||
self.conn = None
|
||||
|
||||
|
||||
if not sys.platform.startswith("win"):
|
||||
|
||||
class LP(Escpos):
|
||||
"""Simple UNIX lp command raw printing.
|
||||
|
||||
Thanks to `Oyami-Srk comment <https://github.com/python-escpos/python-escpos/pull/348#issuecomment-549558316>`_.
|
||||
"""
|
||||
|
||||
def __init__(self, printer_name: str, *args, **kwargs):
|
||||
"""LP class constructor.
|
||||
|
||||
:param printer_name: CUPS printer name (Optional)
|
||||
:type printer_name: str
|
||||
:param auto_flush: Automatic flush after every _raw() (Optional)
|
||||
:type auto_flush: bool
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.printer_name = printer_name
|
||||
self.auto_flush = kwargs.get("auto_flush", True)
|
||||
self.open()
|
||||
|
||||
def open(self):
|
||||
"""Invoke _lp_ in a new subprocess and wait for commands."""
|
||||
self.lp = subprocess.Popen(
|
||||
["lp", "-d", self.printer_name, "-o", "raw"],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=open(os.devnull, "w"),
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Stop the subprocess."""
|
||||
self.lp.terminate()
|
||||
|
||||
def flush(self):
|
||||
"""End line and wait for new commands."""
|
||||
if self.lp.stdin.writable():
|
||||
self.lp.stdin.write(b"\n")
|
||||
if self.lp.stdin.closed is False:
|
||||
self.lp.stdin.close()
|
||||
self.lp.wait()
|
||||
self.open()
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Write raw command(s) to the printer.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
if self.lp.stdin.writable():
|
||||
self.lp.stdin.write(msg)
|
||||
else:
|
||||
raise Exception("Not a valid pipe for lp process")
|
||||
if self.auto_flush:
|
||||
self.flush()
|
|
@ -0,0 +1,22 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""printer implementations."""
|
||||
|
||||
from .cups import CupsPrinter
|
||||
from .dummy import Dummy
|
||||
from .file import File
|
||||
from .lp import LP
|
||||
from .network import Network
|
||||
from .serial import Serial
|
||||
from .usb import Usb
|
||||
from .win32raw import Win32Raw
|
||||
|
||||
__all__ = [
|
||||
"Usb",
|
||||
"File",
|
||||
"Network",
|
||||
"Serial",
|
||||
"LP",
|
||||
"Dummy",
|
||||
"CupsPrinter",
|
||||
"Win32Raw",
|
||||
]
|
|
@ -0,0 +1,180 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
import functools
|
||||
import tempfile
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
#: keeps track if the pycups dependency could be loaded (:py:class:`escpos.printer.CupsPrinter`)
|
||||
_DEP_PYCUPS = False
|
||||
|
||||
try:
|
||||
import cups
|
||||
|
||||
_DEP_PYCUPS = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
# TODO: dev build mode that let's the wrapper bypass?
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
usable = False
|
||||
if _DEP_PYCUPS:
|
||||
usable = True
|
||||
return usable
|
||||
|
||||
|
||||
def dependency_pycups(func):
|
||||
"""Indicate dependency on pycups."""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Throw a RuntimeError if pycups is not imported."""
|
||||
if not is_usable():
|
||||
raise RuntimeError(
|
||||
"Printing with PyCups requires the pycups library to"
|
||||
"be installed. Please refer to the documentation on"
|
||||
"what to install and install the dependencies for pycups."
|
||||
)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class CupsPrinter(Escpos):
|
||||
"""Simple CUPS printer connector.
|
||||
|
||||
.. note::
|
||||
|
||||
Requires ``pycups`` which in turn needs the cups development library package:
|
||||
- Ubuntu/Debian: ``libcups2-dev``
|
||||
- OpenSuse/Fedora: ``cups-devel``
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.CupsPrinter
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
@dependency_pycups
|
||||
def __init__(self, printer_name=None, *args, **kwargs):
|
||||
"""Class constructor for CupsPrinter.
|
||||
|
||||
:param printer_name: CUPS printer name (Optional)
|
||||
:type printer_name: str
|
||||
:param host: CUPS server host/ip (Optional)
|
||||
:type host: str
|
||||
:param port: CUPS server port (Optional)
|
||||
:type port: int
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
host, port = args or (
|
||||
kwargs.get("host", cups.getServer()),
|
||||
kwargs.get("port", cups.getPort()),
|
||||
)
|
||||
cups.setServer(host)
|
||||
cups.setPort(port)
|
||||
self.conn = cups.Connection()
|
||||
self.tmpfile = None
|
||||
self.printer_name = printer_name
|
||||
self.job_name = ""
|
||||
self.pending_job = False
|
||||
self.open()
|
||||
|
||||
@property
|
||||
def printers(self):
|
||||
"""Available CUPS printers."""
|
||||
return self.conn.getPrinters()
|
||||
|
||||
def open(self, job_name="python-escpos"):
|
||||
"""Set up a new print job and target the printer.
|
||||
|
||||
A call to this method is required to send new jobs to
|
||||
the same CUPS connection.
|
||||
|
||||
Defaults to default CUPS printer.
|
||||
Creates a new temporary file buffer.
|
||||
"""
|
||||
self.job_name = job_name
|
||||
if self.printer_name not in self.printers:
|
||||
self.printer_name = self.conn.getDefault()
|
||||
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Append any command sent in raw format to temporary file.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.pending_job = True
|
||||
try:
|
||||
self.tmpfile.write(msg)
|
||||
except ValueError:
|
||||
self.pending_job = False
|
||||
raise ValueError("Printer job not opened")
|
||||
|
||||
@dependency_pycups
|
||||
def send(self):
|
||||
"""Send the print job to the printer."""
|
||||
if self.pending_job:
|
||||
# Rewind tempfile
|
||||
self.tmpfile.seek(0)
|
||||
# Print temporary file via CUPS printer.
|
||||
self.conn.printFile(
|
||||
self.printer_name,
|
||||
self.tmpfile.name,
|
||||
self.job_name,
|
||||
{"document-format": cups.CUPS_FORMAT_RAW},
|
||||
)
|
||||
self._clear()
|
||||
|
||||
def _clear(self):
|
||||
"""Finish the print job.
|
||||
|
||||
Remove temporary file.
|
||||
"""
|
||||
self.tmpfile.close()
|
||||
self.pending_job = False
|
||||
|
||||
def _read(self):
|
||||
"""Return a single-item array with the accepting state of the print queue.
|
||||
|
||||
states: idle = [3], printing a job = [4], stopped = [5]
|
||||
"""
|
||||
printer = self.printers.get(self.printer_name, {})
|
||||
state = printer.get("printer-state")
|
||||
if not state:
|
||||
return []
|
||||
return [state]
|
||||
|
||||
def close(self):
|
||||
"""Close CUPS connection.
|
||||
|
||||
Send pending job to the printer if needed.
|
||||
"""
|
||||
if self.pending_job:
|
||||
self.send()
|
||||
if self.conn:
|
||||
print("Closing CUPS connection to printer {}".format(self.printer_name))
|
||||
self.conn = None
|
|
@ -0,0 +1,70 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
return True
|
||||
|
||||
|
||||
class Dummy(Escpos):
|
||||
"""Dummy printer.
|
||||
|
||||
This class is used for saving commands to a variable, for use in situations where
|
||||
there is no need to send commands to an actual printer. This includes
|
||||
generating print jobs for later use, or testing output.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Dummy
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Init with empty output list."""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self._output_list = []
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self._output_list.append(msg)
|
||||
|
||||
@property
|
||||
def output(self):
|
||||
"""Get the data that was sent to this printer."""
|
||||
return b"".join(self._output_list)
|
||||
|
||||
def clear(self):
|
||||
"""Clear the buffer of the printer.
|
||||
|
||||
This method can be called if you send the contents to a physical printer
|
||||
and want to use the Dummy printer for new output.
|
||||
"""
|
||||
del self._output_list[:]
|
||||
|
||||
def close(self):
|
||||
"""Close not implemented for Dummy printer."""
|
||||
pass
|
|
@ -0,0 +1,78 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
return True
|
||||
|
||||
|
||||
class File(Escpos):
|
||||
"""Generic file printer.
|
||||
|
||||
This class is used for parallel port printer or other printers that are directly attached to the filesystem.
|
||||
Note that you should stay away from using USB-to-Parallel-Adapter since they are unreliable
|
||||
and produce arbitrary errors.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.File
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
def __init__(self, devfile="/dev/usb/lp0", auto_flush=True, *args, **kwargs):
|
||||
"""Initialize file printer with device file.
|
||||
|
||||
:param devfile: Device file under dev filesystem
|
||||
:param auto_flush: automatically call flush after every call of _raw()
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.devfile = devfile
|
||||
self.auto_flush = auto_flush
|
||||
self.open()
|
||||
|
||||
def open(self):
|
||||
"""Open system file."""
|
||||
self.device = open(self.devfile, "wb")
|
||||
|
||||
if self.device is None:
|
||||
print("Could not open the specified file {0}".format(self.devfile))
|
||||
|
||||
def flush(self):
|
||||
"""Flush printing content."""
|
||||
self.device.flush()
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.write(msg)
|
||||
if self.auto_flush:
|
||||
self.flush()
|
||||
|
||||
def close(self):
|
||||
"""Close system file."""
|
||||
if self.device is not None:
|
||||
self.device.flush()
|
||||
self.device.close()
|
|
@ -0,0 +1,110 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
import functools
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
usable = False
|
||||
if sys.platform.startswith("win"):
|
||||
usable = True
|
||||
return usable
|
||||
|
||||
|
||||
def dependency_linux_lp(func):
|
||||
"""Indicate dependency on non Windows."""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Throw a RuntimeError if not on a non-Windows system."""
|
||||
if not is_usable():
|
||||
raise RuntimeError(
|
||||
"This printer driver depends on LP which is not"
|
||||
"available on Windows systems."
|
||||
)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class LP(Escpos):
|
||||
"""Simple UNIX lp command raw printing.
|
||||
|
||||
Thanks to `Oyami-Srk comment <https://github.com/python-escpos/python-escpos/pull/348#issuecomment-549558316>`_.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.LP
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
def __init__(self, printer_name: str, *args, **kwargs):
|
||||
"""LP class constructor.
|
||||
|
||||
:param printer_name: CUPS printer name (Optional)
|
||||
:type printer_name: str
|
||||
:param auto_flush: Automatic flush after every _raw() (Optional)
|
||||
:type auto_flush: bool
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.printer_name = printer_name
|
||||
self.auto_flush = kwargs.get("auto_flush", True)
|
||||
self.open()
|
||||
|
||||
@dependency_linux_lp
|
||||
def open(self):
|
||||
"""Invoke _lp_ in a new subprocess and wait for commands."""
|
||||
self.lp = subprocess.Popen(
|
||||
["lp", "-d", self.printer_name, "-o", "raw"],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=open(os.devnull, "w"),
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Stop the subprocess."""
|
||||
self.lp.terminate()
|
||||
|
||||
def flush(self):
|
||||
"""End line and wait for new commands."""
|
||||
if self.lp.stdin.writable():
|
||||
self.lp.stdin.write(b"\n")
|
||||
if self.lp.stdin.closed is False:
|
||||
self.lp.stdin.close()
|
||||
self.lp.wait()
|
||||
self.open()
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Write raw command(s) to the printer.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
if self.lp.stdin.writable():
|
||||
self.lp.stdin.write(msg)
|
||||
else:
|
||||
raise Exception("Not a valid pipe for lp process")
|
||||
if self.auto_flush:
|
||||
self.flush()
|
|
@ -0,0 +1,94 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
import socket
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
return True
|
||||
|
||||
|
||||
class Network(Escpos):
|
||||
"""Network printer.
|
||||
|
||||
This class is used to attach to a networked printer. You can also use this in order to attach to a printer that
|
||||
is forwarded with ``socat``.
|
||||
|
||||
If you have a local printer on parallel port ``/dev/usb/lp0`` then you could start ``socat`` with:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
socat -u TCP4-LISTEN:4242,reuseaddr,fork OPEN:/dev/usb/lp0
|
||||
|
||||
Then you should be able to attach to port ``4242`` with this class.
|
||||
Otherwise the normal usecase would be to have a printer with ethernet interface. This type of printer should
|
||||
work the same with this class. For the address of the printer check its manuals.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Network
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
def __init__(self, host, port=9100, timeout=60, *args, **kwargs):
|
||||
"""Initialize network printer.
|
||||
|
||||
:param host: Printer's hostname or IP address
|
||||
:param port: Port to write to
|
||||
:param timeout: timeout in seconds for the socket-library
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self.open()
|
||||
|
||||
def open(self):
|
||||
"""Open TCP socket with ``socket``-library and set it as escpos device."""
|
||||
self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.device.settimeout(self.timeout)
|
||||
self.device.connect((self.host, self.port))
|
||||
|
||||
if self.device is None:
|
||||
print("Could not open socket for {0}".format(self.host))
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.sendall(msg)
|
||||
|
||||
def _read(self):
|
||||
"""Read data from the TCP socket."""
|
||||
return self.device.recv(16)
|
||||
|
||||
def close(self):
|
||||
"""Close TCP connection."""
|
||||
if self.device is not None:
|
||||
try:
|
||||
self.device.shutdown(socket.SHUT_RDWR)
|
||||
except socket.error:
|
||||
pass
|
||||
self.device.close()
|
|
@ -0,0 +1,153 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
|
||||
import functools
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
#: keeps track if the pyserial dependency could be loaded (:py:class:`escpos.printer.Serial`)
|
||||
_DEP_PYSERIAL = False
|
||||
|
||||
try:
|
||||
import serial
|
||||
|
||||
_DEP_PYSERIAL = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
usable = False
|
||||
if _DEP_PYSERIAL:
|
||||
usable = True
|
||||
return usable
|
||||
|
||||
|
||||
def dependency_pyserial(func):
|
||||
"""Indicate dependency on pyserial."""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Throw a RuntimeError if pyserial not installed."""
|
||||
if not is_usable():
|
||||
raise RuntimeError(
|
||||
"Printing with Serial requires the pyserial library to"
|
||||
"be installed. Please refer to the documentation on"
|
||||
"what to install and install the dependencies for pyserial."
|
||||
)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class Serial(Escpos):
|
||||
"""Serial printer.
|
||||
|
||||
This class describes a printer that is connected by serial interface.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Serial
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
@dependency_pyserial
|
||||
def __init__(
|
||||
self,
|
||||
devfile="/dev/ttyS0",
|
||||
baudrate=9600,
|
||||
bytesize=8,
|
||||
timeout=1,
|
||||
parity=None,
|
||||
stopbits=None,
|
||||
xonxoff=False,
|
||||
dsrdtr=True,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""Initialize serial printer.
|
||||
|
||||
:param devfile: Device file under dev filesystem
|
||||
:param baudrate: Baud rate for serial transmission
|
||||
:param bytesize: Serial buffer size
|
||||
:param timeout: Read/Write timeout
|
||||
:param parity: Parity checking
|
||||
:param stopbits: Number of stop bits
|
||||
:param xonxoff: Software flow control
|
||||
:param dsrdtr: Hardware flow control (False to enable RTS/CTS)
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.devfile = devfile
|
||||
self.baudrate = baudrate
|
||||
self.bytesize = bytesize
|
||||
self.timeout = timeout
|
||||
if parity:
|
||||
self.parity = parity
|
||||
else:
|
||||
self.parity = serial.PARITY_NONE
|
||||
if stopbits:
|
||||
self.stopbits = stopbits
|
||||
else:
|
||||
self.stopbits = serial.STOPBITS_ONE
|
||||
self.xonxoff = xonxoff
|
||||
self.dsrdtr = dsrdtr
|
||||
|
||||
self.open()
|
||||
|
||||
@dependency_pyserial
|
||||
def open(self):
|
||||
"""Set up serial port and set is as escpos device."""
|
||||
if self.device is not None and self.device.is_open:
|
||||
self.close()
|
||||
self.device = serial.Serial(
|
||||
port=self.devfile,
|
||||
baudrate=self.baudrate,
|
||||
bytesize=self.bytesize,
|
||||
parity=self.parity,
|
||||
stopbits=self.stopbits,
|
||||
timeout=self.timeout,
|
||||
xonxoff=self.xonxoff,
|
||||
dsrdtr=self.dsrdtr,
|
||||
)
|
||||
|
||||
if self.device is not None:
|
||||
print("Serial printer enabled")
|
||||
else:
|
||||
print("Unable to open serial printer on: {0}".format(str(self.devfile)))
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.write(msg)
|
||||
|
||||
def _read(self):
|
||||
"""Read the data buffer and return it to the caller."""
|
||||
return self.device.read(16)
|
||||
|
||||
def close(self):
|
||||
"""Close Serial interface."""
|
||||
if self.device is not None and self.device.is_open:
|
||||
self.device.flush()
|
||||
self.device.close()
|
|
@ -0,0 +1,162 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the USB printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
import functools
|
||||
|
||||
from ..escpos import Escpos
|
||||
from ..exceptions import USBNotFoundError
|
||||
|
||||
#: keeps track if the usb dependency could be loaded (:py:class:`escpos.printer.Usb`)
|
||||
_DEP_USB = False
|
||||
|
||||
try:
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
_DEP_USB = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
usable = False
|
||||
if _DEP_USB:
|
||||
usable = True
|
||||
return usable
|
||||
|
||||
|
||||
def dependency_usb(func):
|
||||
"""Indicate dependency on usb."""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Throw a RuntimeError if usb not installed."""
|
||||
if not is_usable():
|
||||
raise RuntimeError(
|
||||
"Printing with USB connection requires a usb library to"
|
||||
"be installed. Please refer to the documentation on"
|
||||
"what to install and install the dependencies for USB."
|
||||
)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class Usb(Escpos):
|
||||
"""USB printer.
|
||||
|
||||
This class describes a printer that natively speaks USB.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Usb
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
idVendor,
|
||||
idProduct,
|
||||
usb_args=None,
|
||||
timeout=0,
|
||||
in_ep=0x82,
|
||||
out_ep=0x01,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""Initialize USB printer.
|
||||
|
||||
:param idVendor: Vendor ID
|
||||
:param idProduct: Product ID
|
||||
:param usb_args: Optional USB arguments (e.g. custom_match)
|
||||
:param timeout: Is the time limit of the USB operation. Default without timeout.
|
||||
:param in_ep: Input end point
|
||||
:param out_ep: Output end point
|
||||
"""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
self.timeout = timeout
|
||||
self.in_ep = in_ep
|
||||
self.out_ep = out_ep
|
||||
|
||||
usb_args = usb_args or {}
|
||||
if idVendor:
|
||||
usb_args["idVendor"] = idVendor
|
||||
if idProduct:
|
||||
usb_args["idProduct"] = idProduct
|
||||
self.open(usb_args)
|
||||
|
||||
@dependency_usb
|
||||
def open(self, usb_args):
|
||||
"""Search device on USB tree and set it as escpos device.
|
||||
|
||||
:param usb_args: USB arguments
|
||||
"""
|
||||
self.device = usb.core.find(**usb_args)
|
||||
if self.device is None:
|
||||
raise USBNotFoundError("Device not found or cable not plugged in.")
|
||||
|
||||
self.idVendor = self.device.idVendor
|
||||
self.idProduct = self.device.idProduct
|
||||
|
||||
# pyusb has three backends: libusb0, libusb1 and openusb but
|
||||
# only libusb1 backend implements the methods is_kernel_driver_active()
|
||||
# and detach_kernel_driver().
|
||||
# This helps enable this library to work on Windows.
|
||||
if self.device.backend.__module__.endswith("libusb1"):
|
||||
check_driver = None
|
||||
|
||||
try:
|
||||
check_driver = self.device.is_kernel_driver_active(0)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
if check_driver is None or check_driver:
|
||||
try:
|
||||
self.device.detach_kernel_driver(0)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
except usb.core.USBError as e:
|
||||
if check_driver is not None:
|
||||
print("Could not detatch kernel driver: {0}".format(str(e)))
|
||||
|
||||
try:
|
||||
self.device.set_configuration()
|
||||
self.device.reset()
|
||||
except usb.core.USBError as e:
|
||||
print("Could not set configuration: {0}".format(str(e)))
|
||||
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
self.device.write(self.out_ep, msg, self.timeout)
|
||||
|
||||
def _read(self):
|
||||
"""Read a data buffer and return it to the caller."""
|
||||
return self.device.read(self.in_ep, 16)
|
||||
|
||||
@dependency_usb
|
||||
def close(self):
|
||||
"""Release USB interface."""
|
||||
if self.device:
|
||||
usb.util.dispose_resources(self.device)
|
||||
self.device = None
|
|
@ -0,0 +1,115 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""This module contains the implementation of the CupsPrinter printer driver.
|
||||
|
||||
:author: python-escpos developers
|
||||
:organization: `python-escpos <https://github.com/python-escpos>`_
|
||||
:copyright: Copyright (c) 2012-2023 Bashlinux and python-escpos
|
||||
:license: MIT
|
||||
"""
|
||||
|
||||
import functools
|
||||
|
||||
from ..escpos import Escpos
|
||||
|
||||
#: keeps track if the win32print dependency could be loaded (:py:class:`escpos.printer.Win32Raw`)
|
||||
_DEP_WIN32PRINT = False
|
||||
|
||||
try:
|
||||
import win32print
|
||||
|
||||
_DEP_WIN32PRINT = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this component can be used due to dependencies."""
|
||||
usable = False
|
||||
if _DEP_WIN32PRINT:
|
||||
usable = True
|
||||
return usable
|
||||
|
||||
|
||||
def dependency_win32print(func):
|
||||
"""Indicate dependency on win32print."""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
"""Throw a RuntimeError if win32print not installed."""
|
||||
if not is_usable():
|
||||
raise RuntimeError(
|
||||
"Printing with Win32Raw requires a win32print library to"
|
||||
"be installed. Please refer to the documentation on"
|
||||
"what to install and install the dependencies for win32print."
|
||||
)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class Win32Raw(Escpos):
|
||||
"""Printer binding for win32 API.
|
||||
|
||||
Uses the module pywin32 for printing.
|
||||
|
||||
inheritance:
|
||||
|
||||
.. inheritance-diagram:: escpos.printer.Win32Raw
|
||||
:parts: 1
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_usable() -> bool:
|
||||
"""Indicate whether this printer class is usable.
|
||||
|
||||
Will return True if dependencies are available.
|
||||
Will return False if not.
|
||||
"""
|
||||
return is_usable()
|
||||
|
||||
@dependency_win32print
|
||||
def __init__(self, printer_name=None, *args, **kwargs):
|
||||
"""Initialize default printer."""
|
||||
Escpos.__init__(self, *args, **kwargs)
|
||||
if printer_name is not None:
|
||||
self.printer_name = printer_name
|
||||
else:
|
||||
self.printer_name = win32print.GetDefaultPrinter()
|
||||
self.hPrinter = None
|
||||
self.open()
|
||||
|
||||
@dependency_win32print
|
||||
def open(self, job_name="python-escpos"):
|
||||
"""Open connection to default printer."""
|
||||
if self.printer_name is None:
|
||||
raise Exception("Printer not found")
|
||||
self.hPrinter = win32print.OpenPrinter(self.printer_name)
|
||||
self.current_job = win32print.StartDocPrinter(
|
||||
self.hPrinter, 1, (job_name, None, "RAW")
|
||||
)
|
||||
win32print.StartPagePrinter(self.hPrinter)
|
||||
|
||||
@dependency_win32print
|
||||
def close(self):
|
||||
"""Close connection to default printer."""
|
||||
if not self.hPrinter:
|
||||
return
|
||||
win32print.EndPagePrinter(self.hPrinter)
|
||||
win32print.EndDocPrinter(self.hPrinter)
|
||||
win32print.ClosePrinter(self.hPrinter)
|
||||
self.hPrinter = None
|
||||
|
||||
@dependency_win32print
|
||||
def _raw(self, msg):
|
||||
"""Print any command sent in raw format.
|
||||
|
||||
:param msg: arbitrary code to be printed
|
||||
:type msg: bytes
|
||||
"""
|
||||
if self.printer_name is None:
|
||||
raise Exception("Printer not found")
|
||||
if self.hPrinter is None:
|
||||
raise Exception("Printer job not opened")
|
||||
win32print.WritePrinter(self.hPrinter, msg)
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
import pytest
|
||||
|
||||
import escpos.printer as printer
|
||||
import escpos.escpos
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -48,7 +48,7 @@ import escpos.printer as printer
|
|||
],
|
||||
)
|
||||
def test_check_valid_barcode(bctype, data):
|
||||
assert printer.Escpos.check_barcode(bctype, data)
|
||||
assert escpos.escpos.Escpos.check_barcode(bctype, data)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -104,4 +104,4 @@ def test_check_valid_barcode(bctype, data):
|
|||
],
|
||||
)
|
||||
def test_check_invalid_barcode(bctype, data):
|
||||
assert not printer.Escpos.check_barcode(bctype, data)
|
||||
assert not escpos.escpos.Escpos.check_barcode(bctype, data)
|
||||
|
|
Loading…
Reference in New Issue