from xnu import *
import struct
def GetSeqCount(seq):
return (seq >> 8)
def GetLSeqBits(seq):
rv = ""
if seq & 0x1:
rv += "K"
if seq & 0x2:
rv += "E"
if seq & 0x4:
rv += "W"
if seq & 0x20:
rv += "M"
if seq & 0x40:
rv += "U"
if seq & 0x80:
rv += "I"
return rv
def GetSSeqBits(seq):
rv = ""
if seq & 0x1:
rv += "S"
if seq & 0x2:
rv += "I"
if seq & 0x4:
rv += "Ws"
return rv
def GetLSeqSummary(seq):
return "{:d} {:s}".format(GetSeqCount(seq), GetLSeqBits(seq))
def GetSSeqSummary(seq):
return "{:d} {:s}".format(GetSeqCount(seq), GetSSeqBits(seq))
@header("{0: <24s} {1: <16s} {2: <16s} {3: <16s} {4: <16s}".format('sig', 'tid', 'options', 'lseq', 'useq'))
def GetUserMutexSummary(task, uaddr):
if int(task.t_flags) & 0x1:
mtxlayout = "QIIhhIQIII"
padoffset = 1
else:
mtxlayout = "QIIhhQIII"
padoffset = 0
data = GetUserDataAsString(task, unsigned(uaddr), struct.calcsize(mtxlayout))
info = struct.unpack(mtxlayout, data)
format = "{0: <24s} {1: <16s} {2: <16s} {3: <16s} {4: <16s}"
sigstr = str("{0: <#020x}".format(info[0]))
if info[2] & 0x800:
lseq = info[7+padoffset]
useq = info[8+padoffset]
else:
lseq = info[6+padoffset]
useq = info[7+padoffset]
return format.format(sigstr, hex(info[5+padoffset]), hex(info[2]), hex(lseq), hex(useq))
@lldb_command('showusermutex')
def PthreadShowUserMutex(cmd_args=None):
"""
display information about a userspace mutex at a given address
Syntax: (lldb) showusermutex <task_t> <uaddr>
"""
if not cmd_args:
raise ArgumentError("No arguments passed")
task = kern.GetValueFromAddress(cmd_args[0], "task_t")
uaddr = kern.GetValueFromAddress(cmd_args[1], "user_addr_t")
print GetUserMutexSummary.header
print GetUserMutexSummary(task, uaddr)
@lldb_type_summary(['ksyn_wait_queue *', 'ksyn_wait_queue_t'])
@header("{:<20s} {:<20s} {:<10s} {:<6s} {:<6s} {:<8s} {:<8s} {:<8s} {:<8s}".format('kwq', 'uaddr', 'type', 'pflags', 'kflags', 'refs', 'indrop', 'waiters', 'preposts'))
def GetKwqSummary(kwq):
format = "{:<#20x} {:<#20x} {:<10s} {:<6s} {:<6s} {:<8d} {:<8d} {:<8d} {:<8d}\n"
kwq = Cast(kwq, "ksyn_wait_queue_t")
kwqtype = ""
if kwq.kw_type & 0xff == 0x01:
kwqtype = "mtx"
if kwq.kw_type & 0xff == 0x02:
kwqtype = "cvar"
if kwq.kw_type & 0xff == 0x04:
kwqtype = "rwl"
if kwq.kw_type & 0xff == 0x05:
kwqtype = "sema"
if kwq.kw_type & 0x1000 == 0x1000:
kwqtype += "W" if kwq.kw_type & 0x2000 == 0x2000:
kwqtype += "D"
pflags = ""
if kwq.kw_pflags & 0x2:
pflags += "H" if kwq.kw_pflags & 0x4:
pflags += "S" if kwq.kw_pflags & 0x8:
pflags += "W" if kwq.kw_pflags & 0x10:
pflags += "F"
kflags = ""
if kwq.kw_kflags & 0x1:
kflags += "C" if kwq.kw_kflags & 0x2:
kflags += "Z" if kwq.kw_kflags & 0x4:
kflags += "Q" if kwq.kw_kflags & 0x8:
kflags += "O"
rs = format.format(kwq, kwq.kw_addr, kwqtype, pflags, kflags, kwq.kw_iocount, kwq.kw_dropcount, kwq.kw_inqueue, kwq.kw_fakecount)
rs += "\t{:<10s} {:<10s} {:<10s} {:<10s} {:<10s} {:<10s} {:<10s}\n".format('lowest', 'highest', 'lword', 'uword', 'sword', 'last', 'next')
rs += "\t{:<10d} {:<10d} {:<10s} {:<10d} {:<10s} {:<10s} {:<10s}\n".format(
GetSeqCount(kwq.kw_lowseq), GetSeqCount(kwq.kw_highseq),
GetLSeqSummary(kwq.kw_lword), GetSeqCount(kwq.kw_uword),
GetSSeqSummary(kwq.kw_sword), GetSSeqSummary(kwq.kw_lastseqword),
GetSSeqSummary(kwq.kw_nextseqword))
rs += "\t{:<10s} {:<10s} {:<10s} {:<10s} {:<10s} {:<10s} {:<10s}\n".format(
'pposts', 'lseq', 'sseq', 'intr', 'count', 'seq', 'bits')
intr_type = "NONE"
if kwq.kw_intr.type == 0x1:
intr_type = "READ"
elif kwq.kw_intr.type == 0x2:
intr_type = "WRITE"
rs += "\t{:<10d} {:<10s} {:<10s} {:<10s} {:<10d} {:<10s} {:<10s}\n".format(
kwq.kw_prepost.count,
GetLSeqSummary(kwq.kw_prepost.lseq), GetSSeqSummary(kwq.kw_prepost.sseq),
intr_type, kwq.kw_intr.count,
GetSSeqSummary(kwq.kw_intr.seq), GetSSeqSummary(kwq.kw_intr.returnbits))
rs += "\twaiting readers:\n"
for kwe in IterateTAILQ_HEAD(kwq.kw_ksynqueues[0].ksynq_kwelist, 'kwe_list'):
rs += "\t" + GetKweSummary.header + "\n"
rs += "\t" + GetKweSummary(kwe) + "\n"
rs += "\twaiting writers:\n"
for kwe in IterateTAILQ_HEAD(kwq.kw_ksynqueues[1].ksynq_kwelist, 'kwe_list'):
rs += "\t" + GetKweSummary.header + "\n"
rs += "\t" + GetKweSummary(kwe) + "\n"
if kwq.kw_turnstile:
rs += GetTurnstileSummary.header + "\n"
rs += GetTurnstileSummary(Cast(kwq.kw_turnstile, "struct turnstile *"))
return rs
@lldb_type_summary(['ksyn_waitq_element *', 'ksyn_waitq_element_t'])
@header("{:<20s} {:<20s} {:<10s} {:<10s} {:<20s} {:<20s}".format('kwe', 'kwq', 'lseq', 'state', 'uthread', 'thread'))
def GetKweSummary(kwe):
format = "{:<#20x} {:<#20x} {:<10s} {:<10s} {:<#20x} {:<#20x}"
kwe = Cast(kwe, 'struct ksyn_waitq_element *')
state = ""
if kwe.kwe_state == 1:
state = "INWAIT"
elif kwe.kwe_state == 2:
state = "PPOST"
elif kwe.kwe_state == 3:
state = "BROAD"
else:
state = "{:#10x}".format(kwe.kwe_state)
return format.format(kwe, kwe.kwe_kwqqueue, GetLSeqSummary(kwe.kwe_lockseq), state, kwe.kwe_uth, kwe.kwe_thread)
@header("{0: <24s} {1: <24s} {2: <24s}".format('thread', 'thread_id', 'uthread'))
def GetPthreadSummary(thread):
format = "{0: <24s} {1: <24s} {2: <24s}"
threadstr = str("{0: <#020x}".format(thread))
if int(thread.static_param):
threadstr += "[WQ]"
uthread = Cast(thread.uthread, "uthread_t")
uthreadstr = str("{0: <#020x}".format(uthread))
return format.format(threadstr, hex(thread.thread_id), uthreadstr)
@header("{0: <24s} {1: <24s} {2: <10s} {3: <10s} {4: <10s} {5: <10s} {6: <10s}".format('proc', 'wq', 'sched', 'req', 'idle', 'wq_flags', 'wq_lflags'))
def GetPthreadWorkqueueSummary(wq):
format = "{0: <24s} {1: <24s} {2: <10d} {3: <10d} {4: <10d} {5: <10s} {6: <10s}"
procstr = str("{0: <#020x}".format(wq.wq_proc))
wqstr = str("{0: <#020x}".format(wq))
flags = []
if wq.wq_flags & 0x1:
flags.append("I")
if wq.wq_flags & 0x2:
flags.append("R")
if wq.wq_flags & 0x4:
flags.append("E")
wqflags = []
if wq.wq_lflags & 0x1:
wqflags.append("B")
if wq.wq_lflags & 0x2:
wqflags.append("W")
if wq.wq_lflags & 0x4:
wqflags.append("C")
if wq.wq_lflags & 0x8:
wqflags.append("L")
return format.format(procstr, wqstr, wq.wq_threads_scheduled, wq.wq_reqcount, wq.wq_thidlecount, "".join(flags), "".join(wqflags))
@header("{0: <24s} {1: <5s} {2: <5s} {3: <5s} {4: <5s} {5: <5s} {6: <5s} {7: <5s}".format('category', 'uint', 'uinit', 'lgcy', 'util', 'bckgd', 'maint', 'event'))
def GetPthreadWorkqueueDetail(wq):
format = " {0: <22s} {1: <5d} {2: <5d} {3: <5d} {4: <5d} {5: <5d} {6: <5d} {7: <5d}"
schedstr = format.format('scheduled', wq.wq_thscheduled_count[0], wq.wq_thscheduled_count[1], wq.wq_thscheduled_count[2], wq.wq_thscheduled_count[3], wq.wq_thscheduled_count[4], wq.wq_thscheduled_count[5], wq.wq_thscheduled_count[6])
activestr = format.format('active', wq.wq_thactive_count[0], wq.wq_thactive_count[1], wq.wq_thactive_count[2], wq.wq_thactive_count[3], wq.wq_thactive_count[4], wq.wq_thactive_count[5], wq.wq_thactive_count[6])
return "\n".join([schedstr, activestr])
@lldb_command('showthreadpsynch')
def PthreadCurrentMutex(cmd_args=None):
"""
display information about a thread's pthread state
Syntax: (lldb) showthreadpsync <thread_t>
"""
if not cmd_args:
raise ArgumentError("No arguments passed")
thread = kern.GetValueFromAddress(cmd_args[0], "thread_t")
print GetPthreadSummary.header
print GetPthreadSummary(thread)
uthread = Cast(thread.uthread, "uthread_t")
kwe = Cast(addressof(uthread.uu_save.uus_kwe), 'struct ksyn_waitq_element *')
if not kwe or not kwe.kwe_kwqqueue:
print GetKweSummary.header
print GetKweSummary(kwe)
else:
print GetKwqSummary.header
print GetKwqSummary(kwe.kwe_kwqqueue)
@lldb_command('showpthreadkwq')
def PthreadShowKsynQueue(cmd_args=None):
"""
display information about a pthread ksyn_wait_queue_t
Syntax: (lldb) showpthreadkwq <ksyn_wait_queue_t>
"""
if not cmd_args:
raise ArgumentError("No arguments passed")
kwq = kern.GetValueFromAddress(cmd_args[0], "ksyn_wait_queue_t")
print GetKwqSummary.header
print GetKwqSummary(kwq)
@lldb_command('showpthreadkwe')
def PthreadShowKsynElement(cmd_args=None):
"""
display information about a thread's ksyn_waitq_element
Syntax: (lldb) showpthreadkwe <ksyn_waitq_element_t>
"""
if not cmd_args:
raise ArgumentError("No arguments passed")
kwe = kern.GetValueFromAddress(cmd_args[0], "struct ksyn_waitq_element *")
print GetKweSummary.header
print GetKweSummary(kwe)
@lldb_command('showpthreadworkqueue')
def ShowPthreadWorkqueue(cmd_args=None):
"""
display information about a processes' pthread workqueue
Syntax: (lldb) showpthreadworkqueue <proc_t>
"""
if not cmd_args:
raise ArgumentError("No arguments passed")
proc = kern.GetValueFromAddress(cmd_args[0], "proc_t")
wq = Cast(proc.p_wqptr, "struct workqueue *");
print GetPthreadWorkqueueSummary.header
print GetPthreadWorkqueueSummary(wq)
print GetPthreadWorkqueueDetail.header
print GetPthreadWorkqueueDetail(wq)
def IterateTAILQ_HEAD(headval, element_name):
""" iterate over a TAILQ_HEAD in kernel. refer to bsd/sys/queue.h
params:
headval - value : value object representing the head of the list
element_name- str : string name of the field which holds the list links.
returns:
A generator does not return. It is used for iterating.
value : an object that is of type as headval->tqh_first. Always a pointer object
example usage:
list_head = kern.GetGlobalVariable('mountlist')
for entryobj in IterateTAILQ_HEAD(list_head, 'mnt_list'):
print GetEntrySummary(entryobj)
"""
iter_val = headval.tqh_first
while unsigned(iter_val) != 0 :
yield iter_val
iter_val = iter_val.__getattr__(element_name).tqe_next
def __lldb_init_module(debugger, internal_dict):
pass