Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and updates #78

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 150 additions & 126 deletions recipes/Python/305690_Enumerate_printer_job/recipe-305690.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import time
from ctypes import *
import ctypes as ct
from ctypes import wintypes as wt


"""
This script enumerates printer jobs from a specified default printer. This information includes Jobid, Document name,
Expand All @@ -18,7 +20,30 @@
license GPL

"""
ws = WinDLL("winspool.drv")

LPPRINTER_DEFAULTS = wt.LPVOID

k32 = ct.WinDLL("kernel32.dll")
ws = ct.WinDLL("winspool.drv")

k32.GetLastError.argtypes = ()
k32.GetLastError.restype = wt.DWORD

ws.ClosePrinter.argtypes = (wt.HANDLE,)
ws.ClosePrinter.restype = wt.BOOL

ws.EnumJobsA.argtypes = (wt.HANDLE, wt.DWORD, wt.DWORD, wt.DWORD, wt.LPBYTE, wt.DWORD, wt.LPDWORD, wt.LPDWORD)
ws.EnumJobsA.restype = wt.BOOL

ws.GetDefaultPrinterA.argtypes = (wt.LPSTR, wt.LPDWORD)
ws.GetDefaultPrinterA.restype = wt.BOOL

ws.OpenPrinterA.argtypes = (wt.LPSTR, ct.POINTER(wt.HANDLE), LPPRINTER_DEFAULTS)
ws.OpenPrinterA.restype = wt.BOOL

ws.ReadPrinter.argtypes = (wt.HANDLE, wt.LPVOID, wt.DWORD, wt.LPDWORD)
ws.ReadPrinter.restype = wt.BOOL


#-- Job Status meaning
JOB_STATUS_PAUSED = 0x00000001
Expand All @@ -34,188 +59,187 @@
JOB_STATUS_USER_INTERVENTION = 0x00000400
JOB_STATUS_RESTART = 0x00000800

class SYSTEMTIME(Structure):

class SYSTEMTIME(ct.Structure):
_fields_ = [
("wYear", ct.c_short),
("wMonth", ct.c_short),
("wDayOfWeek", ct.c_short),
("wDay", ct.c_short),
("wHour", ct.c_short),
("wMinute", ct.c_short),
("wSecond", ct.c_short),
("wMilliseconds", ct.c_short),
]


class DEVMODE(ct.Structure):
_fields_ = [
("wYear", c_short),
("wMonth", c_short),
("wDayOfWeek", c_short),
("wDay", c_short),
("wHour", c_short),
("wMinute",c_short),
("wSecond", c_short),
("wMilliseconds", c_short)
]

class DEVMODE(Structure):
_fields_ = [
("dmDeviceName", c_char * 32),
("dmSpecVersion", c_short),
("dmDriverVersion", c_short),
("dmSize", c_short),
("dmDriverExtra", c_short),
("dmFields", c_int),
("dmOrientation", c_short),
("dmPaperSize", c_short),
("dmPaperLength", c_short),
("dmPaperWidth", c_short),
("dmScale", c_short),
("dmCopies", c_short),
("dmDefaultSource", c_short),
("dmPrintQuality", c_short),
("dmColor", c_short),
("dmDuplex", c_short),
("dmYResolution", c_short),
("dmTTOption", c_short),
("dmCollate", c_short),
("dmFormName", c_char * 32),
("dmLogPixels", c_int),
("dmBitsPerPel", c_long),
("dmPelsWidth", c_long),
("dmPelsHeight", c_long),
("dmDisplayFlags", c_long),
("dmDisplayFrequency", c_long)
("dmDeviceName", ct.c_char * 32),
("dmSpecVersion", ct.c_short),
("dmDriverVersion", ct.c_short),
("dmSize", ct.c_short),
("dmDriverExtra", ct.c_short),
("dmFields", ct.c_int),
("dmOrientation", ct.c_short),
("dmPaperSize", ct.c_short),
("dmPaperLength", ct.c_short),
("dmPaperWidth", ct.c_short),
("dmScale", ct.c_short),
("dmCopies", ct.c_short),
("dmDefaultSource", ct.c_short),
("dmPrintQuality", ct.c_short),
("dmColor", ct.c_short),
("dmDuplex", ct.c_short),
("dmYResolution", ct.c_short),
("dmTTOption", ct.c_short),
("dmCollate", ct.c_short),
("dmFormName", ct.c_char * 32),
("dmLogPixels", ct.c_int),
("dmBitsPerPel", ct.c_long),
("dmPelsWidth", ct.c_long),
("dmPelsHeight", ct.c_long),
("dmDisplayFlags", ct.c_long),
("dmDisplayFrequency", ct.c_long),
]
class JOB_INFO_2(Structure):


class JOB_INFO_2(ct.Structure):
_fields_ = [
("JobId", c_ulong),
("pPrinterName", c_char_p),
("pMachineName", c_char_p),
("pUserName", c_char_p),
("pDocument", c_char_p),
("pNotifyName", c_char_p),
("pDatatype", c_char_p),
("pPrintProcessor", c_char_p),
("pParameters", c_char_p),
("pDriverName", c_char_p),
("pDevMode", POINTER(DEVMODE)),
("pStatus", c_char_p),
("pSecurityDescriptor", c_void_p),
("Status", c_ulong),
("Priority", c_ulong),
("Position",c_ulong),
("StartTime", c_ulong),
("UntilTime", c_ulong),
("TotalPages", c_ulong),
("Size", c_ulong),
("JobId", ct.c_ulong),
("pPrinterName", ct.c_char_p),
("pMachineName", ct.c_char_p),
("pUserName", ct.c_char_p),
("pDocument", ct.c_char_p),
("pNotifyName", ct.c_char_p),
("pDatatype", ct.c_char_p),
("pPrintProcessor", ct.c_char_p),
("pParameters", ct.c_char_p),
("pDriverName", ct.c_char_p),
("pDevMode", ct.POINTER(DEVMODE)),
("pStatus", ct.c_char_p),
("pSecurityDescriptor", ct.c_void_p),
("Status", ct.c_ulong),
("Priority", ct.c_ulong),
("Position", ct.c_ulong),
("StartTime", ct.c_ulong),
("UntilTime", ct.c_ulong),
("TotalPages", ct.c_ulong),
("Size", ct.c_ulong),
("Submitted", SYSTEMTIME),
("Time", c_ulong),
("PagesPrinted", c_ulong)
]
("Time", ct.c_ulong),
("PagesPrinted", ct.c_ulong),
]


class Printer:

def ReadPrinterData(self, hPrinter):

#-- Read Data from printer
pReadBuffer = c_buffer(500) # can make this dynamic depending on the job Size i.e. pJobInfo[i].Size
pBuf = addressof(pReadBuffer)
READ_BUFFER_SIZE = sizeof(pReadBuffer)
NoRead = c_ulong()
pNoRead = addressof(NoRead)
BufSize = 500 # can make this dynamic depending on the job Size i.e. pJobInfo[i].Size
pReadBuffer = ct.create_string_buffer(BufSize)
NoRead = ct.c_ulong()

#-- Lets try to get the spool file
ret = ws.ReadPrinter(hPrinter,
pBuf,
READ_BUFFER_SIZE,
pNoRead)

pReadBuffer,
BufSize,
ct.byref(NoRead))
if ret:
print "".join([i for i in pReadBuffer])
pBuf = None
print("Printer buffer:", b"".join([i for i in pReadBuffer]))
else:
print("ReadPrinter error: {:d}".format(k32.GetLastError()))
pReadBuffer = None

def GetDefaultPrinter(self):

#-- Get the default printer
plen = c_long()
ret = ws.GetDefaultPrinterA(None, byref(plen))
pname = c_buffer(plen.value)
ret = ws.GetDefaultPrinterA(pname, byref(plen))
plen = wt.DWORD()
ret = ws.GetDefaultPrinterA(None, ct.byref(plen))
pname = ct.create_string_buffer(plen.value)
ret = ws.GetDefaultPrinterA(pname, ct.byref(plen))
return pname.value
def OpenPrinter(self, prtname = None):

def OpenPrinter(self, prtname=None):
#-- Let open our printer
if prtname == None:
self.prtname = self.GetDefaultPrinter()
self.handle = c_ulong()
ret = ws.OpenPrinterA(self.prtname, byref(self.handle), None)
self.prtname = self.GetDefaultPrinter() if prtname is None else prtname
print("Attempting to open printer {:}".format(self.prtname))
self.handle = wt.HANDLE()
ret = ws.OpenPrinterA(self.prtname, ct.byref(self.handle), None)
if not ret:
print("OpenPrinter error: {:d}".format(k32.GetLastError()))
self.handle = wt.HANDLE()
return self.handle

def ClosePrinter(self, handle = None):
def ClosePrinter(self, handle=None):
#-- Close our printer after opening it
if handle == None:
if handle is None:
ws.ClosePrinter(self.handle)
self.handle = None
else:
ws.ClosePrinter(handle)
handle = None

def EnumJobs(self, pJob, cbBuf):

#-- Enumerates printer jobs
FirstJob = c_ulong(0) #Start from this job
FirstJob = ct.c_ulong(0) #Start from this job
self.NoJobs = 20 #How many jobs do you want to get?
Level = 2 #JOB_INFO_2 Structure
self.pcbNeeded = c_ulong(0) # Bytes needed to fill the structure
self.nReturned = c_ulong(0) # No. of jobs returned
self.pcbNeeded = ct.c_ulong(0) # Bytes needed to fill the structure
self.nReturned = ct.c_ulong(0) # No. of jobs returned

ret = ws.EnumJobsA(self.OpenPrinter(),
FirstJob,
self.NoJobs,
Level,
pJob,
cbBuf,
byref(self.pcbNeeded),
byref(self.nReturned))

FirstJob,
self.NoJobs,
Level,
pJob,
cbBuf,
ct.byref(self.pcbNeeded),
ct.byref(self.nReturned))
return ret



if __name__== "__main__":
while 1:
#-- Loop picking up printer jobs
print("\nChecking for printers (and jobs)...")
prt = Printer()

# we need to call EnumJobs() to find out how much memory you need
# It should have failed if there are jobs on the printer
if not prt.EnumJobs(None,0):
if not prt.EnumJobs(None, 0):

#-- Lets first close the printer
prt.ClosePrinter()

#-- Allocate JOB_INFO_2 structures
pJobArray = JOB_INFO_2 * prt.NoJobs
pJobInfo = pJobArray()
pJob = addressof(pJobInfo)

#-- Allocate buffer for JOB_INFO_2 structures
pJobBuf = ct.create_string_buffer(prt.pcbNeeded.value)
#-- Call EnumJobs now with the correct memory needed from the first call
prt.EnumJobs(pJob, prt.pcbNeeded)
prt.EnumJobs(ct.cast(pJobBuf, wt.LPBYTE), prt.pcbNeeded)

#-- Lets check whether we got any job from the second call
if prt.nReturned.value:
for i in range (prt.nReturned.value):
print pJobInfo[i].JobId, pJobInfo[i].pDocument, pJobInfo[i].pUserName, pJobInfo[i].Status

prtName = prt.GetDefaultPrinter()
JobArray = JOB_INFO_2 * prt.nReturned.value
jobInfo = JobArray.from_buffer(pJobBuf)
for i in range(prt.nReturned.value):
print("\nJob {:}:".format(i), jobInfo[i].JobId, jobInfo[i].pDocument, jobInfo[i].pUserName, jobInfo[i].Status)

prtName = prt.GetDefaultPrinter()

#-- Lets try and get the spool file. The call to open printer must have the jobid:
#-- Format "printername,Job xxxx"
prtJobName = prtName+","+"Job"+" "+str(pJobInfo[i].JobId)
#-- Format "PrinterName, Job xxxx"
prtJobName = "{:}, Job {:}".format(prtName.decode(), jobInfo[i].JobId).encode()
pHandle = prt.OpenPrinter(prtJobName)

if pHandle.value:
#-- Read spool file. Does not work well if you do not have a bidirectional printer.
prt.ReadPrinterData(pHandle)
prt.ClosePrinter(pHandle)
prt.ClosePrinter()

#-- Clean up
pJob = None
pJobInfo = None
pJobBuf = None
prt = None

#-- Wait for the next printer job
#-- Adjust this depending on the speed of your printer and network
time.sleep(3)