Add LP Connector - UNIX printing via lp command

This commit is contained in:
belono 2022-11-24 00:11:01 +01:00
parent 0b695eff79
commit 9a3057c8a8
1 changed files with 79 additions and 34 deletions

View File

@ -9,14 +9,35 @@
"""
import serial
import os
import socket
import subprocess
import sys
import serial
import usb.core
import usb.util
from .escpos import Escpos
from .exceptions import USBNotFoundError
_WIN32PRINT = False
try:
import win32print
_WIN32PRINT = True
except ImportError:
pass
_CUPSPRINT = False
try:
import cups
import tempfile
_CUPSPRINT = True
except ImportError:
pass
class Usb(Escpos):
"""USB printer
@ -371,14 +392,6 @@ class Dummy(Escpos):
pass
_WIN32PRINT = False
try:
import win32print
_WIN32PRINT = True
except ImportError:
pass
if _WIN32PRINT:
class Win32Raw(Escpos):
@ -421,18 +434,11 @@ if _WIN32PRINT:
win32print.WritePrinter(self.hPrinter, msg)
_CUPSPRINT = False
try:
import cups
import tempfile
_CUPSPRINT = True
except ImportError:
pass
if _CUPSPRINT:
class CupsPrinter(Escpos):
""" Simple CUPS printer connector.
"""
"""Simple CUPS printer connector."""
def __init__(self, printer_name=None, *args, **kwargs):
"""CupsPrinter constructor.
@ -445,26 +451,25 @@ if _CUPSPRINT:
"""
Escpos.__init__(self, *args, **kwargs)
host, port = args or (
kwargs.get('host', cups.getServer()),
kwargs.get('port', cups.getPort())
kwargs.get("host", cups.getServer()),
kwargs.get("port", cups.getPort()),
)
cups.setServer(host)
cups.setPort(port)
self.conn = cups.Connection()
self.tmpfile = None
self.printer_name = printer_name
self.job_name = ''
self.job_name = ""
self.pending_job = False
self.open()
@property
def printers(self):
""" Available CUPS printers.
"""
"""Available CUPS printers."""
return self.conn.getPrinters()
def open(self, job_name='python-escpos'):
""" Setup a new print job and target printer.
def open(self, job_name="python-escpos"):
"""Setup a new print job and target printer.
A call to this method is required to send new jobs to
the same CUPS connection.
@ -478,7 +483,7 @@ if _CUPSPRINT:
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
def _raw(self, msg):
""" Append any command sent in raw format to temporary file
"""Append any command sent in raw format to temporary file
:param msg: arbitrary code to be printed
:type msg: bytes
@ -491,8 +496,7 @@ if _CUPSPRINT:
raise ValueError("Printer job not opened")
def send(self):
""" Send the print job to the printer.
"""
"""Send the print job to the printer."""
if self.pending_job:
# Rewind tempfile
self.tmpfile.seek(0)
@ -501,11 +505,12 @@ if _CUPSPRINT:
self.printer_name,
self.tmpfile.name,
self.job_name,
{"document-format": cups.CUPS_FORMAT_RAW})
{"document-format": cups.CUPS_FORMAT_RAW},
)
self._clear()
def _clear(self):
""" Finish the print job.
"""Finish the print job.
Remove temporary file.
"""
@ -513,13 +518,53 @@ if _CUPSPRINT:
self.pending_job = False
def close(self):
""" Close CUPS connection.
"""Close CUPS connection.
Send pending job to the printer if needed.
"""
if self.pending_job:
self.send()
if self.conn:
print('Closing CUPS connection to printer {}'.format(
self.printer_name))
print("Closing CUPS connection to printer {}".format(self.printer_name))
self.conn = None
if not sys.platform.startswith("win"):
class LP(Escpos):
"""Simple UNIX lp raw printing.
From https://github.com/python-escpos/python-escpos/pull/348#issuecomment-549558316
"""
def __init__(self, printer_name: str, *args, **kwargs):
Escpos.__init__(self, *args, **kwargs)
self.printer_name = printer_name
self.auto_flush = kwargs.get("auto_flush", True)
self.open()
def open(self):
self.lp = subprocess.Popen(
["lp", "-d", self.printer_name, "-o", "raw"],
stdin=subprocess.PIPE,
stdout=open(os.devnull, "w"),
)
def close(self):
self.lp.terminate()
def flush(self):
if self.lp.stdin.writable():
self.lp.stdin.write(b"\n")
if self.lp.stdin.closed is False:
self.lp.stdin.close()
self.lp.wait()
self.open()
def _raw(self, msg):
if self.lp.stdin.writable():
self.lp.stdin.write(msg)
else:
raise Exception("Not a valid pipe for lp process")
if self.auto_flush:
self.flush()