diff --git a/recipes/Python/305690_Enumerate_printer_job/recipe-305690.py b/recipes/Python/305690_Enumerate_printer_job/recipe-305690.py index f34449c27..2ddb350ba 100644 --- a/recipes/Python/305690_Enumerate_printer_job/recipe-305690.py +++ b/recipes/Python/305690_Enumerate_printer_job/recipe-305690.py @@ -1,5 +1,7 @@ import time -from ctypes import * +import ctypes as ct +from ctypes import wintypes as wt + """ This script enumerates printer jobs from a specified default printer. This information includes Jobid, Document name, @@ -18,7 +20,30 @@ license GPL """ -ws = WinDLL("winspool.drv") + +LPPRINTER_DEFAULTS = wt.LPVOID + +k32 = ct.WinDLL("kernel32.dll") +ws = ct.WinDLL("winspool.drv") + +k32.GetLastError.argtypes = () +k32.GetLastError.restype = wt.DWORD + +ws.ClosePrinter.argtypes = (wt.HANDLE,) +ws.ClosePrinter.restype = wt.BOOL + +ws.EnumJobsA.argtypes = (wt.HANDLE, wt.DWORD, wt.DWORD, wt.DWORD, wt.LPBYTE, wt.DWORD, wt.LPDWORD, wt.LPDWORD) +ws.EnumJobsA.restype = wt.BOOL + +ws.GetDefaultPrinterA.argtypes = (wt.LPSTR, wt.LPDWORD) +ws.GetDefaultPrinterA.restype = wt.BOOL + +ws.OpenPrinterA.argtypes = (wt.LPSTR, ct.POINTER(wt.HANDLE), LPPRINTER_DEFAULTS) +ws.OpenPrinterA.restype = wt.BOOL + +ws.ReadPrinter.argtypes = (wt.HANDLE, wt.LPVOID, wt.DWORD, wt.LPDWORD) +ws.ReadPrinter.restype = wt.BOOL + #-- Job Status meaning JOB_STATUS_PAUSED = 0x00000001 @@ -34,188 +59,187 @@ JOB_STATUS_USER_INTERVENTION = 0x00000400 JOB_STATUS_RESTART = 0x00000800 -class SYSTEMTIME(Structure): + +class SYSTEMTIME(ct.Structure): + _fields_ = [ + ("wYear", ct.c_short), + ("wMonth", ct.c_short), + ("wDayOfWeek", ct.c_short), + ("wDay", ct.c_short), + ("wHour", ct.c_short), + ("wMinute", ct.c_short), + ("wSecond", ct.c_short), + ("wMilliseconds", ct.c_short), + ] + + +class DEVMODE(ct.Structure): _fields_ = [ - ("wYear", c_short), - ("wMonth", c_short), - ("wDayOfWeek", c_short), - ("wDay", c_short), - ("wHour", c_short), - ("wMinute",c_short), - ("wSecond", c_short), - ("wMilliseconds", c_short) - ] - -class DEVMODE(Structure): - _fields_ = [ - ("dmDeviceName", c_char * 32), - ("dmSpecVersion", c_short), - ("dmDriverVersion", c_short), - ("dmSize", c_short), - ("dmDriverExtra", c_short), - ("dmFields", c_int), - ("dmOrientation", c_short), - ("dmPaperSize", c_short), - ("dmPaperLength", c_short), - ("dmPaperWidth", c_short), - ("dmScale", c_short), - ("dmCopies", c_short), - ("dmDefaultSource", c_short), - ("dmPrintQuality", c_short), - ("dmColor", c_short), - ("dmDuplex", c_short), - ("dmYResolution", c_short), - ("dmTTOption", c_short), - ("dmCollate", c_short), - ("dmFormName", c_char * 32), - ("dmLogPixels", c_int), - ("dmBitsPerPel", c_long), - ("dmPelsWidth", c_long), - ("dmPelsHeight", c_long), - ("dmDisplayFlags", c_long), - ("dmDisplayFrequency", c_long) + ("dmDeviceName", ct.c_char * 32), + ("dmSpecVersion", ct.c_short), + ("dmDriverVersion", ct.c_short), + ("dmSize", ct.c_short), + ("dmDriverExtra", ct.c_short), + ("dmFields", ct.c_int), + ("dmOrientation", ct.c_short), + ("dmPaperSize", ct.c_short), + ("dmPaperLength", ct.c_short), + ("dmPaperWidth", ct.c_short), + ("dmScale", ct.c_short), + ("dmCopies", ct.c_short), + ("dmDefaultSource", ct.c_short), + ("dmPrintQuality", ct.c_short), + ("dmColor", ct.c_short), + ("dmDuplex", ct.c_short), + ("dmYResolution", ct.c_short), + ("dmTTOption", ct.c_short), + ("dmCollate", ct.c_short), + ("dmFormName", ct.c_char * 32), + ("dmLogPixels", ct.c_int), + ("dmBitsPerPel", ct.c_long), + ("dmPelsWidth", ct.c_long), + ("dmPelsHeight", ct.c_long), + ("dmDisplayFlags", ct.c_long), + ("dmDisplayFrequency", ct.c_long), ] -class JOB_INFO_2(Structure): + + +class JOB_INFO_2(ct.Structure): _fields_ = [ - ("JobId", c_ulong), - ("pPrinterName", c_char_p), - ("pMachineName", c_char_p), - ("pUserName", c_char_p), - ("pDocument", c_char_p), - ("pNotifyName", c_char_p), - ("pDatatype", c_char_p), - ("pPrintProcessor", c_char_p), - ("pParameters", c_char_p), - ("pDriverName", c_char_p), - ("pDevMode", POINTER(DEVMODE)), - ("pStatus", c_char_p), - ("pSecurityDescriptor", c_void_p), - ("Status", c_ulong), - ("Priority", c_ulong), - ("Position",c_ulong), - ("StartTime", c_ulong), - ("UntilTime", c_ulong), - ("TotalPages", c_ulong), - ("Size", c_ulong), + ("JobId", ct.c_ulong), + ("pPrinterName", ct.c_char_p), + ("pMachineName", ct.c_char_p), + ("pUserName", ct.c_char_p), + ("pDocument", ct.c_char_p), + ("pNotifyName", ct.c_char_p), + ("pDatatype", ct.c_char_p), + ("pPrintProcessor", ct.c_char_p), + ("pParameters", ct.c_char_p), + ("pDriverName", ct.c_char_p), + ("pDevMode", ct.POINTER(DEVMODE)), + ("pStatus", ct.c_char_p), + ("pSecurityDescriptor", ct.c_void_p), + ("Status", ct.c_ulong), + ("Priority", ct.c_ulong), + ("Position", ct.c_ulong), + ("StartTime", ct.c_ulong), + ("UntilTime", ct.c_ulong), + ("TotalPages", ct.c_ulong), + ("Size", ct.c_ulong), ("Submitted", SYSTEMTIME), - ("Time", c_ulong), - ("PagesPrinted", c_ulong) - ] + ("Time", ct.c_ulong), + ("PagesPrinted", ct.c_ulong), + ] class Printer: def ReadPrinterData(self, hPrinter): - #-- Read Data from printer - pReadBuffer = c_buffer(500) # can make this dynamic depending on the job Size i.e. pJobInfo[i].Size - pBuf = addressof(pReadBuffer) - READ_BUFFER_SIZE = sizeof(pReadBuffer) - NoRead = c_ulong() - pNoRead = addressof(NoRead) + BufSize = 500 # can make this dynamic depending on the job Size i.e. pJobInfo[i].Size + pReadBuffer = ct.create_string_buffer(BufSize) + NoRead = ct.c_ulong() #-- Lets try to get the spool file ret = ws.ReadPrinter(hPrinter, - pBuf, - READ_BUFFER_SIZE, - pNoRead) - + pReadBuffer, + BufSize, + ct.byref(NoRead)) if ret: - print "".join([i for i in pReadBuffer]) - pBuf = None + print("Printer buffer:", b"".join([i for i in pReadBuffer])) + else: + print("ReadPrinter error: {:d}".format(k32.GetLastError())) pReadBuffer = None - + def GetDefaultPrinter(self): - #-- Get the default printer - plen = c_long() - ret = ws.GetDefaultPrinterA(None, byref(plen)) - pname = c_buffer(plen.value) - ret = ws.GetDefaultPrinterA(pname, byref(plen)) + plen = wt.DWORD() + ret = ws.GetDefaultPrinterA(None, ct.byref(plen)) + pname = ct.create_string_buffer(plen.value) + ret = ws.GetDefaultPrinterA(pname, ct.byref(plen)) return pname.value - - def OpenPrinter(self, prtname = None): + + def OpenPrinter(self, prtname=None): #-- Let open our printer - if prtname == None: - self.prtname = self.GetDefaultPrinter() - self.handle = c_ulong() - ret = ws.OpenPrinterA(self.prtname, byref(self.handle), None) + self.prtname = self.GetDefaultPrinter() if prtname is None else prtname + print("Attempting to open printer {:}".format(self.prtname)) + self.handle = wt.HANDLE() + ret = ws.OpenPrinterA(self.prtname, ct.byref(self.handle), None) + if not ret: + print("OpenPrinter error: {:d}".format(k32.GetLastError())) + self.handle = wt.HANDLE() return self.handle - def ClosePrinter(self, handle = None): + def ClosePrinter(self, handle=None): #-- Close our printer after opening it - if handle == None: + if handle is None: ws.ClosePrinter(self.handle) self.handle = None else: ws.ClosePrinter(handle) handle = None - + def EnumJobs(self, pJob, cbBuf): - #-- Enumerates printer jobs - FirstJob = c_ulong(0) #Start from this job + FirstJob = ct.c_ulong(0) #Start from this job self.NoJobs = 20 #How many jobs do you want to get? Level = 2 #JOB_INFO_2 Structure - self.pcbNeeded = c_ulong(0) # Bytes needed to fill the structure - self.nReturned = c_ulong(0) # No. of jobs returned + self.pcbNeeded = ct.c_ulong(0) # Bytes needed to fill the structure + self.nReturned = ct.c_ulong(0) # No. of jobs returned ret = ws.EnumJobsA(self.OpenPrinter(), - FirstJob, - self.NoJobs, - Level, - pJob, - cbBuf, - byref(self.pcbNeeded), - byref(self.nReturned)) - + FirstJob, + self.NoJobs, + Level, + pJob, + cbBuf, + ct.byref(self.pcbNeeded), + ct.byref(self.nReturned)) return ret - if __name__== "__main__": while 1: #-- Loop picking up printer jobs + print("\nChecking for printers (and jobs)...") prt = Printer() - + # we need to call EnumJobs() to find out how much memory you need # It should have failed if there are jobs on the printer - if not prt.EnumJobs(None,0): - + if not prt.EnumJobs(None, 0): + #-- Lets first close the printer prt.ClosePrinter() - - #-- Allocate JOB_INFO_2 structures - pJobArray = JOB_INFO_2 * prt.NoJobs - pJobInfo = pJobArray() - pJob = addressof(pJobInfo) + #-- Allocate buffer for JOB_INFO_2 structures + pJobBuf = ct.create_string_buffer(prt.pcbNeeded.value) #-- Call EnumJobs now with the correct memory needed from the first call - prt.EnumJobs(pJob, prt.pcbNeeded) - + prt.EnumJobs(ct.cast(pJobBuf, wt.LPBYTE), prt.pcbNeeded) + #-- Lets check whether we got any job from the second call if prt.nReturned.value: - for i in range (prt.nReturned.value): - print pJobInfo[i].JobId, pJobInfo[i].pDocument, pJobInfo[i].pUserName, pJobInfo[i].Status - - prtName = prt.GetDefaultPrinter() + JobArray = JOB_INFO_2 * prt.nReturned.value + jobInfo = JobArray.from_buffer(pJobBuf) + for i in range(prt.nReturned.value): + print("\nJob {:}:".format(i), jobInfo[i].JobId, jobInfo[i].pDocument, jobInfo[i].pUserName, jobInfo[i].Status) + + prtName = prt.GetDefaultPrinter() #-- Lets try and get the spool file. The call to open printer must have the jobid: - #-- Format "printername,Job xxxx" - prtJobName = prtName+","+"Job"+" "+str(pJobInfo[i].JobId) + #-- Format "PrinterName, Job xxxx" + prtJobName = "{:}, Job {:}".format(prtName.decode(), jobInfo[i].JobId).encode() pHandle = prt.OpenPrinter(prtJobName) - + if pHandle.value: #-- Read spool file. Does not work well if you do not have a bidirectional printer. prt.ReadPrinterData(pHandle) prt.ClosePrinter(pHandle) prt.ClosePrinter() - + #-- Clean up - pJob = None - pJobInfo = None + pJobBuf = None prt = None - + #-- Wait for the next printer job #-- Adjust this depending on the speed of your printer and network time.sleep(3)