Separate open() and store connection in 'device'

This commit is contained in:
belono 2023-08-28 11:48:46 +02:00
parent 8763eba86c
commit d68eee08a7
7 changed files with 72 additions and 68 deletions

View File

@ -20,6 +20,9 @@ try:
import cups
_DEP_PYCUPS = True
# Store server defaults before further configuration
DEFAULT_HOST = cups.getServer()
DEFAULT_PORT = cups.getPort()
except ImportError:
pass
@ -89,37 +92,37 @@ class CupsPrinter(Escpos):
:type port: int
"""
Escpos.__init__(self, *args, **kwargs)
host, port = args or (
kwargs.get("host", cups.getServer()),
kwargs.get("port", cups.getPort()),
self.host, self.port = args or (
kwargs.get("host", DEFAULT_HOST),
kwargs.get("port", DEFAULT_PORT),
)
cups.setServer(host)
cups.setPort(port)
self.conn = cups.Connection()
self.tmpfile = None
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
self.printer_name = printer_name
self.job_name = ""
self.pending_job = False
self.open()
@property
def printers(self):
"""Available CUPS printers."""
return self.conn.getPrinters()
return self.device.getPrinters()
def open(self, job_name="python-escpos"):
"""Set up a new print job and target the printer.
A call to this method is required to send new jobs to
the same CUPS connection.
the same CUPS connection after close.
Defaults to default CUPS printer.
Creates a new temporary file buffer.
"""
cups.setServer(self.host)
cups.setPort(self.port)
self.device = cups.Connection()
self.job_name = job_name
if self.printer_name not in self.printers:
self.printer_name = self.conn.getDefault()
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
self.printer_name = self.device.getDefault()
if self.tmpfile.closed:
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
def _raw(self, msg):
"""Append any command sent in raw format to temporary file.
@ -141,7 +144,7 @@ class CupsPrinter(Escpos):
# Rewind tempfile
self.tmpfile.seek(0)
# Print temporary file via CUPS printer.
self.conn.printFile(
self.device.printFile(
self.printer_name,
self.tmpfile.name,
self.job_name,
@ -173,8 +176,9 @@ class CupsPrinter(Escpos):
Send pending job to the printer if needed.
"""
if not self._device:
return
if self.pending_job:
self.send()
if self.conn:
print("Closing CUPS connection to printer {}".format(self.printer_name))
self.conn = None
print("Closing CUPS connection to printer {}".format(self.printer_name))
self.device = None

View File

@ -48,13 +48,12 @@ class File(Escpos):
Escpos.__init__(self, *args, **kwargs)
self.devfile = devfile
self.auto_flush = auto_flush
self.open()
def open(self):
"""Open system file."""
self.device = open(self.devfile, "wb")
if self.device is None:
if not self.device:
print("Could not open the specified file {0}".format(self.devfile))
def flush(self):
@ -73,6 +72,8 @@ class File(Escpos):
def close(self):
"""Close system file."""
if self.device is not None:
self.device.flush()
self.device.close()
if not self._device:
return
self.device.flush()
self.device.close()
self.device = None

View File

@ -72,12 +72,11 @@ class LP(Escpos):
Escpos.__init__(self, *args, **kwargs)
self.printer_name = printer_name
self.auto_flush = kwargs.get("auto_flush", True)
self.open()
@dependency_linux_lp
def open(self):
"""Invoke _lp_ in a new subprocess and wait for commands."""
self.lp = subprocess.Popen(
self.device = subprocess.Popen(
["lp", "-d", self.printer_name, "-o", "raw"],
stdin=subprocess.PIPE,
stdout=open(os.devnull, "w"),
@ -85,15 +84,18 @@ class LP(Escpos):
def close(self):
"""Stop the subprocess."""
self.lp.terminate()
if not self._device:
return
self.device.terminate()
self.device = None
def flush(self):
"""End line and wait for new commands."""
if self.lp.stdin.writable():
self.lp.stdin.write(b"\n")
if self.lp.stdin.closed is False:
self.lp.stdin.close()
self.lp.wait()
if self.device.stdin.writable():
self.device.stdin.write(b"\n")
if self.device.stdin.closed is False:
self.device.stdin.close()
self.device.wait()
self.open()
def _raw(self, msg):
@ -102,8 +104,8 @@ class LP(Escpos):
:param msg: arbitrary code to be printed
:type msg: bytes
"""
if self.lp.stdin.writable():
self.lp.stdin.write(msg)
if self.device.stdin.writable():
self.device.stdin.write(msg)
else:
raise Exception("Not a valid pipe for lp process")
if self.auto_flush:

View File

@ -61,7 +61,6 @@ class Network(Escpos):
self.host = host
self.port = port
self.timeout = timeout
self.open()
def open(self):
"""Open TCP socket with ``socket``-library and set it as escpos device."""
@ -69,7 +68,7 @@ class Network(Escpos):
self.device.settimeout(self.timeout)
self.device.connect((self.host, self.port))
if self.device is None:
if not self.device:
print("Could not open socket for {0}".format(self.host))
def _raw(self, msg):
@ -86,9 +85,11 @@ class Network(Escpos):
def close(self):
"""Close TCP connection."""
if self.device is not None:
try:
self.device.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self.device.close()
if not self._device:
return
try:
self.device.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self.device.close()
self.device = None

View File

@ -111,12 +111,10 @@ class Serial(Escpos):
self.xonxoff = xonxoff
self.dsrdtr = dsrdtr
self.open()
@dependency_pyserial
def open(self):
"""Set up serial port and set is as escpos device."""
if self.device is not None and self.device.is_open:
if self.device and self.device.is_open:
self.close()
self.device = serial.Serial(
port=self.devfile,
@ -129,7 +127,7 @@ class Serial(Escpos):
dsrdtr=self.dsrdtr,
)
if self.device is not None:
if self.device:
print("Serial printer enabled")
else:
print("Unable to open serial printer on: {0}".format(str(self.devfile)))
@ -148,6 +146,9 @@ class Serial(Escpos):
def close(self):
"""Close Serial interface."""
if self.device is not None and self.device.is_open:
if not self._device:
return
if self.device.is_open:
self.device.flush()
self.device.close()
self.device = None

View File

@ -95,20 +95,16 @@ class Usb(Escpos):
self.in_ep = in_ep
self.out_ep = out_ep
usb_args = usb_args or {}
self.usb_args = usb_args or {}
if idVendor:
usb_args["idVendor"] = idVendor
self.usb_args["idVendor"] = idVendor
if idProduct:
usb_args["idProduct"] = idProduct
self.open(usb_args)
self.usb_args["idProduct"] = idProduct
@dependency_usb
def open(self, usb_args):
"""Search device on USB tree and set it as escpos device.
:param usb_args: USB arguments
"""
self.device = usb.core.find(**usb_args)
def open(self):
"""Search device on USB tree and set it as escpos device."""
self.device = usb.core.find(**self.usb_args)
if self.device is None:
raise USBNotFoundError("Device not found or cable not plugged in.")
@ -157,6 +153,7 @@ class Usb(Escpos):
@dependency_usb
def close(self):
"""Release USB interface."""
if self.device:
usb.util.dispose_resources(self.device)
if not self._device:
return
usb.util.dispose_resources(self.device)
self.device = None

View File

@ -77,29 +77,27 @@ class Win32Raw(Escpos):
self.printer_name = printer_name
else:
self.printer_name = win32print.GetDefaultPrinter()
self.hPrinter = None
self.open()
@dependency_win32print
def open(self, job_name="python-escpos"):
"""Open connection to default printer."""
if self.printer_name is None:
raise Exception("Printer not found")
self.hPrinter = win32print.OpenPrinter(self.printer_name)
self.device = win32print.OpenPrinter(self.printer_name)
self.current_job = win32print.StartDocPrinter(
self.hPrinter, 1, (job_name, None, "RAW")
self.device, 1, (job_name, None, "RAW")
)
win32print.StartPagePrinter(self.hPrinter)
win32print.StartPagePrinter(self.device)
@dependency_win32print
def close(self):
"""Close connection to default printer."""
if not self.hPrinter:
if not self._device:
return
win32print.EndPagePrinter(self.hPrinter)
win32print.EndDocPrinter(self.hPrinter)
win32print.ClosePrinter(self.hPrinter)
self.hPrinter = None
win32print.EndPagePrinter(self.device)
win32print.EndDocPrinter(self.device)
win32print.ClosePrinter(self.device)
self.device = None
@dependency_win32print
def _raw(self, msg):
@ -110,6 +108,6 @@ class Win32Raw(Escpos):
"""
if self.printer_name is None:
raise Exception("Printer not found")
if self.hPrinter is None:
if not self.device:
raise Exception("Printer job not opened")
win32print.WritePrinter(self.hPrinter, msg)
win32print.WritePrinter(self.device, msg)