Auditing Protocols in Control Systems

Thanks to the near constant stream of “the sky is falling, these protocols aren’t secure” presentations at security conferences around the globe, everyone is familiar with mainstream ICS protocols, Ethernet IP, DNP3, and of course Modbus, amongst others. And of course it is important to make sure that these protocols are implemented correctly to assure that the devices supporting them function reliably. Generally, these protocols aren’t on the “front lines”, they’re going to be behind at least a couple of firewalls, probably a dmz, and if someone was interested in causing trouble then by the time they’ve gotten access to the parts of the network that these protocols live on, they’re able to do anything they want.

So if implementations of these protocols aren’t the problem, what is? The proprietary protocols glueing together all the components in a system. These usually come in the form of a service listening on some not “well known” port, usually TCP, and they handle much of the inter process/inter system communication that all the components need. They’re used for time synchronization, “heartbeats”, logging, service discovery, and just about anything else you can think of. For the most part using protocols that are only documented internally, and in some cases only in the source code for the application in question. This tends to lead to hastily written code that never sees a bit of negative testing.

But, the protocol is unknown, so for testing it you have a few options. From easiest and least likely to find any problems to most difficult and giving as much code coverage as possible you can cat /dev/urandom to the port and see what happens, you can capture some network traffic and do some dumb fuzzing, capture network traffic do some protocol analysis and do some smart fuzzing, or go all out reversing the application line by line. Each step should give more knowledge, more vulnerabilities, and cost quite a bit more.

Sometimes though, these services are just used for communication between components operating on the same system (they probably should use another mechanism, but its a shortcut often taken in the interest of future plans), even though they listen on the network. So how are you supposed to capture network traffic? On *nix systems its easy, but Windows doesn’t have a loopback interface that lets you sniff it. Theres some workarounds, but I didn’t really like any of them so I came up with my own solution.

Lately I’ve gotten into scripting debuggers, in the form of pydbg, and Immunity debugger, both of which have a nice python API. So, I thought I’d use Immunity Debuggers built in !packets function to hook the windows socket functions and dump out everything that comes through them. That would have worked great for most applications, but of course the one I was auditing at the time didn’t want to cooperate, and after crashing the debugger a few times I figured it was time to investigate. Well, it turns out that security pros make the same mistakes that we constantly berate developers for, the scripts built into Immunity Debugger didn’t check the return code of WSARecv (eax) before trying to read the memory. Sometimes this leads to garbage, sometimes it leads to a crash.

A bit of quick scripting later and the problem was solved, and I’m writing out all the data being sent to the process our to a file and we’ll start analyzing it to feed into our fuzzer in the next step of our audit. Quick little script is below, and like most things I write its just enough to get the job done and there is likely a few dozen ways to do it better, but it got results, and fed nicely into my smarter dumb fuzzer.

from immlib import *
from immutils import * 

import struct

class ret_hook(LogBpHook):
def __init__(self):
LogBpHook.__init__(self)

def run(self, regs):
imm = Debugger()
(payload_ptr, recv_ptr) = imm.getKnowledge(“%08x” %regs[‘EIP’])
return_value = regs['EAX’]
imm.Log(“in ret_hook, EAX is %08x” % return_value)
if return_value == 0:
imm.Log(“in ret_hook, recv_ptr is %08x” % recv_ptr)
length = imm.readMemory(recv_ptr,4)
length = struct.unpack(“l”, length)
imm.Log(“in ret_hook.run, length = %d” % length[0])

if length[0] > 1:
raw_payload = imm.readMemory(payload_ptr, int(length[0]))
pack_len = str(int(length[0]))+”c”
imm.Log(“in ret_hook.run, pack_len = %s” % pack_len)

if raw_payload is not None:
counter = 0
bin_payload = “”

while counter < int(length[0]):
bin_payload += “\x%02x” % ord(raw_payload[counter])
counter += 1
imm.Log(bin_payload)
fout = open(“Z:\FuzzShare\captured_bytes”, “a”)
print >>fout, bin_payload
fout.close()

imm.forgetKnowledge(“%08x” % regs['EIP’])
imm.disableBreakpoint(regs['EIP’])
self.UnHook()

class set_hooks(LogBpHook):
def __init__(self):
LogBpHook.__init__(self)

def run(self, regs):
imm = Debugger()
function_name = imm.getKnowledge(“%08x” % regs['EIP’])
imm.Log(“In set_hooks”)
self.retrieve_packet(imm,function_name,regs)

def retrieve_packet(self, imm, function_name, regs):
if function_name == “WSARecv”:
extended_hook = True

if extended_hook:
pbuffers = imm.readMemory(regs['ESP’] + 8, 4)
pbuffers = struct.unpack(“<L”, pbuffers)
imm.Log(“Buffers start at 0x%08x” % pbuffers)

payload_len = imm.readMemory(pbuffers[0],4)
payload_len = struct.unpack(“<L”, payload_len)

payload_ptr = imm.readMemory(pbuffers[0]+4,4)
payload_ptr = struct.unpack(“<L”, payload_ptr)
imm.Log(“Payload Pointer at: %08x” % payload_ptr[0])

recv_ptr = imm.readMemory(regs['ESP’] + 0×10, 4)
recv_ptr = struct.unpack(“<L”, recv_ptr)
imm.Log(“Recv_ptr at: %08x” % recv_ptr)

esp_ptr = imm.readMemory(regs['ESP’],4)
esp_ptr = struct.unpack(“<L”, esp_ptr)
imm.Log(“[ESP] at 0x%08x” % esp_ptr[0])

rh = ret_hook()
rh.add(“%08x” % esp_ptr[0], esp_ptr[0])

imm.addKnowledge(“%08x” % esp_ptr[0], (payload_ptr[0],recv_ptr[0]))

def main(args):
imm = Debugger()
hooks = set_hooks()

module = imm.getModule(imm.getDebuggedName())

if not module.isAnalysed():
imm.analyseCode(module.getCodebase())

imm.Pause()
ws_wsarecv     = imm.getAddress(“ws2_32.WSARecv”)
imm.addKnowledge(“%08x” % ws_wsarecv,     “WSARecv”)

hooks.add(“WSARecv”, ws_wsarecv)

return “WSARecv hook in place.”