Date: Mon, 17 Aug 2009 06:50:57 GMT From: Zachariah Riggle <zjriggl@FreeBSD.org> To: Perforce Change Reviews <perforce@FreeBSD.org> Subject: PERFORCE change 167424 for review Message-ID: <200908170650.n7H6ovNw003066@repoman.freebsd.org>
next in thread | raw e-mail | index | archive | help
http://perforce.freebsd.org/chv.cgi?CH=167424 Change 167424 by zjriggl@zjriggl_tcpregression on 2009/08/17 06:50:01 Final commit for SoC. Everything should work after this. I'm gonna download it all into my FreeBSD VM and make sure the tests run. Affected files ... .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/CHANGES#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/LICENSE#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/Makefile#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/README#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap.pyx#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap_ex.c#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap_ex.h#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/setup.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/__init__.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/packetFilterTest.pcap#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/test.pcap#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/testPcap.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/testsniff.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/echoServer.py#6 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcs/pcap/CHANGES#4 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcs/pcap/README#4 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/StringField.py#4 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/__init__.py#10 edit .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/backup.tar#4 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/hwAddress.py#5 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/hwaddress.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/ipAddress.py#7 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/ipaddress.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/networkPort.py#6 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/networkport.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/payload.py#5 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/pseudoipv4.py#4 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/segmentBuffer.py#4 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/segmentbuffer.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sequence.py#2 edit .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sniffLocalhost.py#6 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpConstructor.py#7 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpFilter.py#8 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpconstructor.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpfilter.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpstatemachine.py#10 edit .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/test.html#6 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/test.py#2 delete .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/__init__.py#2 edit .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/echoclient.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/echoserver.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/logging.conf#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/segmentBufferTest.py#2 edit .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/tcpFilterTest.py#2 edit .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/test.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testSequence.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testStates.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpConstructor.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpHandshake.py#1 add .. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpStateMachine.py#1 add Differences ... ==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/__init__.py#10 (text+ko) ==== @@ -4,6 +4,7 @@ import socket import sys import re +import binascii from pcs.packets.ipv4 import ipv4 from pcs.packets.tcp import tcp from pcs.packets.tcpv6 import tcpv6 @@ -37,4 +38,3 @@ x = re.sub( octet, r"\1 ", x ) x = re.sub( twoBytes, r"\1\n", x ) return x - ==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sequence.py#2 (text+ko) ==== @@ -5,55 +5,69 @@ ''' from pcsextension import findTcpLayer -def seq(x): +def seq( x ): # It is essential to remember that the actual sequence number space is # finite, though very large. This space ranges from 0 to 2**32 - 1. # Since the space is finite, all arithmetic dealing with sequence # numbers must be performed modulo 2**32. This unsigned arithmetic # preserves the relationship of sequence numbers as they cycle from # 2**32 - 1 to 0 again. - return x % (2**32) + return x % ( 2 ** 32 ) -class sequenced(str): +class Sequenced( str ): ''' This class exists to encapsulate sequenced items in the TCP stream. Each byte (octet) is assigned a sequence number, as is the original SYN and final - FIN. + FIN. + + Raw data CAN be encapsulated in a Sequenced object, but doing so will use + additional memory and is not necessary. The Sequenced class should only + be used to denote special sequences that have either the SYN or FIN bits + set (each of which can have the ACK bits set). + + For example, a data stream may look like the following... + + Client + [SYN] datadatadatadata + [SYN\ACK] + Server ''' syn = False fin = False ack = False - - def __new__(cls, data='', syn=False, fin=False, ack=True): - # New has to be used, because the str type is immutable. - if len(data) == 0: - if syn: - data += "[SYN]" - if fin: - data += "[FIN]" - if ack: - data += "[ACK]" - - obj = super( sequenced, cls).__new__( sequenced, data or "" ) + + # New has to be used, because the str type is immutable. + def __new__( cls, syn = False, fin = False, ack = False ): + + # Don't overwrite any user-provided data. + data = '' + if syn and not ack: + data += "[SYN]" + elif fin and not ack: + data += "[FIN]" + elif syn and ack: + data += "[SYN\ACK]" + elif fin and ack: + data += "[FIN\ACK]" + + # Check for invalid flag combos. + if ack and not ( syn or fin ): + raise Warning, "The 'ack' bit should not be set by itself. This is not " \ + "the intended use of the class Sequenced." + if not ( syn or ack or fin ): + raise Warning, "Did not set any flags." + + obj = super( Sequenced, cls ).__new__( Sequenced, data or "" ) + + # The below statements allow us to use True/False or 1/0 or any other + # expression that can be evaluated to a boolean. obj.syn = True if syn else False obj.ack = True if ack else False obj.fin = True if fin else False - return obj - -def makeSequenced(packet): - - t = findTcpLayer(packet) - - if t.syn or t.fin: - return [sequenced(syn=t.syn, ack=t.ack, fin=t.fin)] - elif t.data is not None: - return [sequenced(byte,ack=t.ack) for byte in t.data] - - return [] - + return obj # Special cases defined here for convenience... -seq_syn = sequenced(syn=True,ack=False) -seq_synack = sequenced(syn=True,ack=True) -seq_fin = sequenced(fin=True,ack=False) -seq_finack = sequenced(fin=True,ack=True)+seq_syn = Sequenced( syn = True, ack = False ) +seq_synack = Sequenced( syn = True, ack = True ) +seq_fin = Sequenced( fin = True, ack = False ) +seq_finack = Sequenced( fin = True, ack = True ) ==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpstatemachine.py#10 (text+ko) ==== @@ -9,20 +9,20 @@ from pcsextension import findTcpLayer from pcsextension.checksum import tcpChecksum from pcsextension.decorators import prop, validateTypes, uint16, uint32, synchronized -from pcsextension.hwAddress import HwAddress -from pcsextension.ipAddress import IpAddress -from pcsextension.networkPort import NetworkPort -from segmentBuffer import segmentBuffer -from sequence import seq, sequenced, makeSequenced, seq_fin, seq_finack, seq_syn, seq_synack -from tcpConstructor import tcpConstructor -from tcpFilter import tcpFilter +from pcsextension.hwaddress import HwAddress +from pcsextension.ipaddress import IpAddress +from pcsextension.networkport import NetworkPort +from segmentbuffer import SegmentBuffer +from sequence import seq, Sequenced, seq_fin, seq_finack, seq_syn, seq_synack +from tcpconstructor import tcpConstructor +from tcpfilter import TcpFilter from tcprecvdaemon import TcpRecvDaemon from tcpsenddaemon import TcpSendDaemon from tcpstates import CLOSE_WAIT, CLOSED, CLOSING, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, \ LAST_ACK, LISTEN, SYN_RECEIVED, SYN_SENT, TIME_WAIT, TcpState, synchronizedStates, tcpStates from time import time from threading import RLock -import pcap +import pcs.pcap as pcap import pcs import testconfig import time @@ -116,12 +116,59 @@ 3 ''' - __constructor = tcpConstructor() - __connector = tcpFilter( testconfig.interface ) + __constructor = None + __tcpFilter = None __recvThread = None __sendThread = None lock = RLock() + sendLock = RLock() + recvLock = RLock() + + def __init__( self, localhost = ( testconfig.localIP, testconfig.localPort ), + remotehost = ( testconfig.remoteIP, testconfig.remotePort ) ): + ''' + Initialize the TCP State Machine. The information for the local host and the + remote host must be provided as a 2 - tuple, a string and an integer. + + Example: + >>> t = TcpStateMachine( ( "127.0.0.1", 10000 ), ( "192.168.0.1", 80 ) ) + ''' + # Required objects. + # self.lock = RLock() + self.__constructor = tcpConstructor() + self.__tcpFilter = TcpFilter( testconfig.interface ) + self._inboundSequences = SegmentBuffer() + self._outboundSequences = SegmentBuffer() + + # Logger + self.log = tcplog( self ) + self.log.debug( "(%s, %s)" % ( localhost, remotehost ) ) + + # Set IP and host + if validateTypes( {localhost: tuple} ) and len( localhost ) == 2: + self._localIP = IpAddress( localhost[0] ) + self._localPort = NetworkPort( localhost[1] ) + self.log.info( 'Local host: %s:%i' % ( self.localIP.getAscii(), self.localPort.getInteger() ) ) + + if validateTypes( {remotehost: tuple} ) and len( remotehost ) == 2: + self._remoteIP = IpAddress( remotehost[0] ) + self._remotePort = NetworkPort( remotehost[1] ) + self.log.info( 'Remote host: %s:%i' % ( self.remoteIP.getAscii(), self.remotePort.getInteger() ) ) + + # Set loopback mode + if self.remoteIP == self.localIP or \ + testconfig.interface in self.loopbackInterfaces: + + # If the user didn't specify a loopback interface, print a warning. + if testconfig.interface not in self.loopbackInterfaces: + self.log.warn( 'Automatically setting loopback mode to \'True\'' ) + + self.setLoopback( True ) + + # Reset the internal state. + self.reset() + # @uint32 @prop def snd_nxt(): @@ -202,7 +249,7 @@ self.inboundSequences.base = irs self.inboundSequences = [] del self.inboundSequences[:] - # self.inboundSequences = segmentBuffer([], irs, self.inboundSequences.limit) + # self.inboundSequences = SegmentBuffer([], irs, self.inboundSequences.limit) # self.inboundSequences.base = irs # @uint32 @@ -249,20 +296,29 @@ _remoteEthernet = HwAddress( default = testconfig.remoteMAC ) @prop - def localIP(): '''Local IP address.''' - _localIP = IpAddress( default = testconfig.localIP ) + def localIP(): + '''Local IP address.''' + return {'fset': lambda self, x: self.localIP.set( x ) } @prop - def remoteIP(): '''Remote IP address.''' - _remoteIP = IpAddress( default = testconfig.remoteIP ) + def remoteIP(): + '''Remote IP address.''' + return {'fset': lambda self, x: self.remoteIP.set( x ) } @prop - def localPort(): '''Local port.''' - _localPort = NetworkPort( default = testconfig.localPort ) + def localPort(): + '''Local port.''' + return {'fset': lambda self, x: self.localPort.set( x ) } @prop - def remotePort(): '''Remote port.''' - _remotePort = NetworkPort( default = testconfig.remotePort ) + def remotePort(): + '''Remote port.''' + return {'fset': lambda self, x: self.remotePort.set( x ) } + + def setRemotePort( self, x ): + import traceback + traceback.print_stack() + self._remotePort = x @prop def userTimer(): ''' User timeout timer ''' @@ -274,21 +330,21 @@ # Interface is actually a shortcut to the connector's interface field, # which is itself a property. Setting a TcpStateMachine's interface will - # effectively trigger the tcpFilter object to switch interfaces to the + # effectively trigger the TcpFilter object to switch interfaces to the # specified interface. @prop def interface(): '''Interface to use for sending/recving data''' - return {'fget': lambda self: self.__connector.interface, - 'fset': lambda self, x: setattr( self.__connector, 'interface', x )} + return {'fget': lambda self: self.__tcpFilter.interface, + 'fset': lambda self, x: setattr( self.__tcpFilter, 'interface', x )} def getConnector( self ): ''' Retrieves the connector used to send and receive packets. Note: The name 'connector' is simply for consistency. The object that is - returned is actually a tcpFilter object. + returned is actually a TcpFilter object. ''' - return self.__connector + return self.__tcpFilter def setLoopback( self, lb = True ): ''' @@ -346,42 +402,12 @@ tcp.f_window: True, tcp.f_urg_pointer: True } - @prop - def validate(): - ''' - Dictionary of fields to be validated. Non - valid packets are dropped, non - valid - settings throw an error. Accepted Values: - %s - Incoming packet TCP Checksum - %s - Incoming packet TCP Sequence Number - %s - Incoming packet TCP Ack number - %s - TCP State Machine transition - ''' - _validate = { tcp.f_checksum: True, - tcp.f_sequence: True, - tcp.f_ack_number: True, - #tcp.f_sport: True, - #tcp.f_dport: True, - 'transition': True } - def generateISS( self ): ''' Generates a new Initial Sequence Number (ISS). ''' return seq( random.randint( 0, ( 1 << 32 ) - 1 ) ) -# def _updateSegmentBuffer(self, newBuffer, oldBuffer): -# # If it's the same object, just return it. -# if newBuffer is oldBuffer: -# return segmentBuffer -# -# # If it is not of the correct type, make it into a segment buffer, -# # preserving the current base and max. -# if type(newBuffer) is not segmentBuffer: -# return segmentBuffer(newBuffer, base=oldBuffer.base, max=oldBuffer.max) -# -# # If it is the correct type, but is just a new buffer, assign it. -# return newBuffer - @prop def outboundSequences(): ''' @@ -399,7 +425,6 @@ sequence number overflows 2**32. ''' return {'fset': lambda self, x: self.outboundSequences.update( x ) } - _outboundSequences = segmentBuffer() @prop def inboundSequences(): @@ -408,7 +433,6 @@ @see outboundSequences for more information. ''' return { 'fset': lambda self, x: self.inboundSequences.update( x ) } - _inboundSequences = segmentBuffer() @prop def retransmissionQ(): @@ -424,11 +448,24 @@ Note that this buffer will explicitly exclude all SYN and FIN sequences. @see inboundSequences ''' - return {'fget': - lambda self: - [octet for octet in self.inboundSequences[seq( self.recvBufferOffset + self.irs ):] \ - if type( octet ) != sequenced or not ( octet.syn or octet.ack ) ] - } + return {'fget': lambda self: [octet for octet in self.inboundSequences ] } + + # Old version didn't work, because Sequenced objects would go into inboundSequences. + # This would, in turn, cause a single byte to be received twice once the buffer + # was incremented, because the number of bytes that were incremented would not + # take into account the SYN or SYN/ACK at the beginning of the buffer that did not + # get returned by recvBuffer + #[octet for octet in self.inboundSequences[seq( self.recvBufferOffset + self.irs ):] \ + # if type( octet ) != Sequenced or not ( octet.syn or octet.ack ) ] + + + def getRecvBuffer( self ): + print "%i bytes in inboundSequences" % len( self.inboundSequences ) + print "IRS: %i Rcv Offset: %i" % ( self.irs, self.recvBufferOffset ) + return [octet for octet in self.inboundSequences ] + # if type( octet ) != Sequenced or not ( octet.syn or octet.ack ) ] + + @prop def recvBufferOffset(): @@ -491,7 +528,6 @@ def __str__( self ): ''' Prints out the annotated status. - ''' statusNames = ['local socket', 'foreign socket', @@ -559,6 +595,8 @@ Close the socket connection. This is synonymous with the 'close' function used by normal UNIX sockets. ''' + self.log.info( "Closing connection" ) + #CLOSED STATE (i.e., TCB does not exist) if self.state is CLOSED: # If the user does not have access to such a connection, return @@ -587,13 +625,13 @@ # then form a FIN segment and send it, and enter FIN-WAIT-1 state; # otherwise queue for processing after entering ESTABLISHED state. if self.snd_una == self.snd_nxt: - self.outboundSequences += [seq_fin] #sequenced(fin=1)] + self.outboundSequences += [seq_fin] #Sequenced(fin=1)] #ESTABLISHED STATE elif self.state is ESTABLISHED: # Queue this until all preceding SENDs have been segmentized, then # form a FIN segment and send it. In any case, enter FIN-WAIT-1 # state. - self.outboundSequences += [seq_fin]# sequenced(fin=1)] + self.outboundSequences += [seq_fin]# Sequenced(fin=1)] self.state = FIN_WAIT_1 #FIN-WAIT-1 STATE @@ -609,7 +647,7 @@ elif self.state is CLOSE_WAIT: # Queue this request until all preceding SENDs have been # segmentized; then send a FIN segment, enter CLOSING state. - self.outboundSequences += [seq_fin] #sequenced(fin=1)] + self.outboundSequences += [seq_fin] #Sequenced(fin=1)] #CLOSING STATE #LAST-ACK STATE @@ -658,7 +696,7 @@ # snd_una is automatically handled # self.snd_una = self.iss self.state = SYN_SENT - self.outboundSequences = [seq_syn]# sequenced(syn=True)] + self.outboundSequences = [seq_syn]# Sequenced(syn=True)] self.__sendThread.timeout.trigger() # SNX_NXT is automatically incremented @@ -721,11 +759,12 @@ del self.outboundSequences[:] del self.inboundSequences[:] + self.recvBufferOffset = 0 # self.recvBufferOffset = 0 self.state = CLOSED - @synchronized( lock ) +# @synchronized( sendLock ) def sendPacket( self, packet ): ''' Inform the TCP State Machine about packets that have been transmitted. @@ -746,7 +785,7 @@ # Check for a syn if seg.syn: self.iss = seg.sequence - self.outboundSequences += [sequenced( syn = seg.syn, ack = seg.ack )] + self.outboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack )] if self.state is CLOSED or ( self.state is LISTEN and not seg.ack ): self.state = SYN_SENT @@ -755,17 +794,21 @@ # Check for other data if seg.data is not None: - self.outboundSequences += [sequenced( octet ) for octet in seg.data] + # It is unnecessary to use the Sequenced class on raw data. It + # provides no extra utility, and simply causes excess memory usage. + # self.outboundSequences += [Sequenced( octet ) for octet in seg.data] + self.outboundSequences += [octet for octet in seg.data] # Check for a fin if seg.fin: - self.outboundSequences += [sequenced( fin = seg.fin, ack = seg.ack )] + self.outboundSequences += [Sequenced( fin = seg.fin, ack = seg.ack )] if self.state == ESTABLISHED: self.state = FIN_WAIT_1 self.__sendThread.timeout.trigger() +# @synchronized( recvLock ) def recvRawTcp( self, block = True ): ''' Receives a single packet off of the interface that is destined for @@ -773,11 +816,12 @@ @param block: Set to True to block until a packet is available. ''' - rv = self.__connector.readFilteredByTuple( self.localIP, self.localPort, block ) + rv = self.__tcpFilter.read() if rv is not None: return findTcpLayer( rv ) return rv +# @synchronized( sendLock ) def sendRawTcp( self, tcpLayer ): ''' Packages the provided TCP packet and payload inside of the appropriate @@ -788,17 +832,20 @@ ''' # self.log.debug( repr( tcpLayer ) ) + self.log.debug( 'In sendRawTcp()' ) if not validateTypes( {tcpLayer:tcp.tcp} ): + self.log.error( "tcpLayer was incorrect type: %s" % type( tcpLayer ) ) return + # self.__socket.sendto(tcpLayer.chain().bytes, (self.remoteIP().getAscii(),self.remotePort().getAscii())) - # self.__socket.sendto(tcpLayer.chain().bytes, (self.remoteIP().getAscii(),self.remotePort().getAscii())) chain = self.__constructor.generateChain( tcpLayer ) + self.log.debug( 'Built chain' ) self.log.pktsent( repr ( tcpLayer.chain() ) ) - self.__connector.write( chain.bytes ) + self.__tcpFilter.write( chain.bytes ) pass - @synchronized( lock ) +# @synchronized( sendLock ) def send( self, data, async = False ): ''' Sends the specified data. @@ -810,24 +857,28 @@ Optionally send asynchronously. Do not wait for acknowledgement. ''' if not validateTypes( {data:str} ): - return - 1 + self.log.error( 'Can only send strings' ) + return ( -1 ) if self.state == CLOSED: self.log.error( 'connection does not exist' ) - return - 1 + return ( -1 ) if self.state == LISTEN: + self.log.info( 'attempted send() on LISTENing connection.'\ + ' performing active-open' ) self.open() if self.state in ( SYN_RECEIVED, SYN_SENT ): # Queue the data for send if we are not in the connected state. + self.log.debug( 'Added %i bytes to the outboundSequences queue' % len( data ) ) self.outboundSequences += [x for x in data] if self.state in ( ESTABLISHED, CLOSE_WAIT ): # Add the data to the outbound buffer. firstSeq = self.snd_nxt - self.outboundSequences += [sequenced( octet ) for octet in data] + self.outboundSequences += [octet for octet in data] # [Sequenced(octed) for octed in data] lastSeq = self.snd_nxt - 1 # Break the data up into MSS-sized chunks @@ -840,23 +891,29 @@ pkt = self.newPacket( {tcp.f_data: payload.payload( chunk ), tcp.f_sequence: sequence } ) - self.sendPacket( pkt ) + self.sendRawTcp( pkt ) sequence += len( chunk ) + # If packet auto-processing has been disabled and the call is + # blocking, this could lead to a deadlock. + if not async and not self.processPacketsOnArrival: + self.log.warn( 'Performed blocking send with processPacketsOnArrival disabled.' \ + ' This can lead to a deadlock situation because ACKs may not ' \ + 'be received and processed.' ) + # This is a blocking call, we must wait until all of the data has # been acknowledged. if not async: timer( 5.0, lambda: self.snd_una > lastSeq ).wait() - #timeout = timer(timeout=5) - #while (not async) and (self.snd_una <= lastSeq) and not timeout.expired: - # time.sleep(0.01) - return len( pkt.data ) elif self.state in ( FIN_WAIT_1, FIN_WAIT_2, CLOSING, LAST_ACK, TIME_WAIT ): self.log.error( 'connection closing' ) return - 1 + + self.__sendThread.timeout.trigger() + def logGenerated( self, packet = None, fieldname = None ): ''' Helper method to log generated field data. @@ -982,7 +1039,7 @@ tcp.f_ack_number: self.rcv_nxt, tcp.f_ack: 1} - print "Sending ack: {seq: %i, ack: %i, ack#: %i}" % ( self.snd_nxt, 1, self.rcv_nxt ) + self.log.debug( "Sending ack: {seq: %i, ack: %i, ack#: %i}" % ( self.snd_nxt, 1, self.rcv_nxt ) ) if fields is not None: ackFields.update( fields ) @@ -1032,8 +1089,8 @@ return packet - @synchronized( lock ) - def recv( self, numOctets = 4096, delay = 120 ): +# @synchronized( recvLock ) + def recv( self, numOctets = 4096, timeout = 120, fillBuffer = False ): ''' Returns *up to* numOctets octets of data. Returns the octets as a string. @@ -1042,28 +1099,50 @@ @param numOctets: Maximum number of octets to get. (Default 4096) - @param delay: + @param timeout: Maximum amount of time, in seconds, to wait for data to be available. (Default 2 minutes) + @param fillBuffer: + Attempt to fill the buffer within the timeout, instead of returning + immediately when data is available. Return when the timeout expires + or when the numOctets octest have been read. @return: Returns the octets as a string, or None if no octets were available before the operation timed out. ''' - t = timer( delay ) - while len( self.recvBuffer ) == 0 and not t.expired: - time.sleep( 0.05 ) + t = timer( timeout ) + octets = [] + + # Keep looping until [1] we have received data or [2] we have filled the + # buffer, with the condition that looping will stop *immediately* when + # the timer expires. + while ( ( len( octets ) == 0 and len( octets ) < numOctets ) or \ + ( fillBuffer and len( octets ) < numOctets ) ) and \ + not t.expired: + + # Are there any bytes? Append them to octets. + if len( self.recvBuffer ) > 0: + # self.log.debug( 'Recv buffer has %i octets' % len( self.recvBuffer ) ) + # What is our length now? We need to know this for below. + lengthBefore = len( octets ) + + # Append the octets + octets += self.recvBuffer[:numOctets - len( octets )] - if len( self.recvBuffer ) > 0: - octets = self.recvBuffer[:numOctets] - self.recvBufferOffset += len( octets ) + # Increment recvBufferOffset by the number of octets received + # during just this one pass. + self.recvBufferOffset += ( len( octets ) - lengthBefore ) + else: + time.sleep( 0.01 ) + if len( octets ) > 0: return ''.join( octets ) - else: - return None + + return None - @synchronized( lock ) +# @synchronized( recvLock ) def recvPacket( self, packet = None, timeout = None ): ''' Get the next packet that has been received. Packets are guaranteed @@ -1080,7 +1159,7 @@ self.log.error( "Use segmentArrives(packet)" ) raise DeprecationWarning - @synchronized( lock ) +# @synchronized( recvLock ) def segmentArrives( self, packet = None ): ''' Inform the TCP State Machine about packets that have been received. @@ -1092,7 +1171,7 @@ # Make sure that there is TCP data in the packet. seg = findTcpLayer( packet ) - self.log.info( "Segment arrived!" ) + self.log.debug( "Segment arrived" ) if seg is None: self.log.warn( 'Could not find TCP layer in packet' ) @@ -1164,18 +1243,18 @@ # <SEQ=SEG.ACK><CTL=RST> pass # We don't check security/compartment self.debugWithState( "Received SYN" ) + # Set RCV.NXT to SEG.SEQ+1, IRS is set to SEG.SEQ and any other # control or text should be queued for processing later. ISS # should be selected and a SYN segment sent of the form: # <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK> self.irs = seg.sequence - # self.rcv_nxt = seg.sequence + 1 # Done automatically by property self.iss = self.generateISS() - # self.snd_nxt = self.iss # Done automatically by property - self.inboundSequences += [sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # sequenced(syn=1)] + self.inboundSequences += 1 + # self.inboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # pkt = self.newPacket( {tcp.f_sequence:self.iss, tcp.f_ack_number: self.rcv_nxt, tcp.f_ack:1, tcp.f_syn: 1} ) - self.outboundSequences += [seq_synack]#sequenced(syn=1)] + self.outboundSequences += [seq_synack]#Sequenced(syn=1)] pkt = self.newPacket( {tcp.f_syn:1, tcp.f_ack:1, tcp.f_ack_number: seg.sequence} ) self.sendPacket( pkt ) @@ -1267,7 +1346,8 @@ # SEG.SEQ. # self.rcv_nxt = seg.sequence + 1 # Done automagically self.irs = seg.sequence - self.inboundSequences += [sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # [sequenced(syn=1)] + self.inboundSequences += 1 + # self.inboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # [Sequenced(syn=1)] # SND.UNA should be advanced to equal SEG.ACK (if there # is an ACK), and any segments on the retransmission queue which @@ -1363,6 +1443,7 @@ if seg.data is not None and self.rcv_wnd == 0: acceptable = False # for good measure + if seg.data is not None and self.rcv_wnd > 0 and \ ( ( self.rcv_nxt <= seg.sequence < ( self.rcv_nxt + self.rcv_wnd ) ) or ( self.rcv_nxt <= ( seg.sequence + len( seg.data ) - 1 ) < ( self.rcv_nxt + self.rcv_wnd ) ) ): @@ -1374,6 +1455,7 @@ # # <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK> if not acceptable: + self.log.debug( "Ack is unacceptable!" ) if seg.reset: return None @@ -1382,7 +1464,8 @@ # After sending the acknowledgment, drop the unacceptable segment # and return. return None - + else: + self.log.debug( "Processing acceptable ACK" ) # In the following it is assumed that the segment is the idealized # segment that begins at RCV.NXT and does not exceed the window. # One could tailor actual segments to fit this assumption by @@ -1400,6 +1483,7 @@ # came from the LISTEN state), then return this connection to # LISTEN state and return. The user need not be informed. if self._lastState == LISTEN: + self.log.debug( "Received RST, restoring connection to %s state." % LISTEN ) self.state = LISTEN return seg @@ -1416,6 +1500,7 @@ # And in the active OPEN case, enter the CLOSED state and delete the TCB, # and return. if self._lastState == SYN_SENT: + self.log.debug( "Received RST packet, resetting connection state" ) self.reset() return seg @@ -1429,6 +1514,7 @@ # flushed. Users should also receive an unsolicited general # "connection reset" signal. Enter the CLOSED state, delete the # TCB, and return. + self.log.debug( "Received RST packet, resetting connection state" ) self.reset() return seg @@ -1438,6 +1524,7 @@ if self.state in ( CLOSING, LAST_ACK, TIME_WAIT ): # If the RST bit is set then, enter the CLOSED state, delete the # TCB, and return. + self.log.debug( "Received RST packet, resetting connection state" ) self.reset() return seg @@ -1465,6 +1552,7 @@ # If the SYN is not in the window this step would not be reached # and an ack would have been sent in the first step (sequence # number check). + self.log.debug( "Received out-of-place SYN, sending RST and resetting connection state" ) self.sendRst( seg ) self.reset() return seg @@ -1472,6 +1560,7 @@ # fifth check the ACK field, # if the ACK bit is off drop the segment and return if not seg.ack: + self.log.debug( "ACK bit was not set, dropping segment" ) return None # if the ACK bit is on @@ -1489,6 +1578,7 @@ # <SEQ=SEG.ACK><CTL=RST> # and send it. if not acceptable: + self.log.debug( "ACK is not acceptable, sending RST" ) self.sendRst( seg ) # TODO TODO TODO # Might have to return here. RFC doesn't say to. @@ -1511,12 +1601,14 @@ # If the ACK is a duplicate (SEG.ACK < SND.UNA), it can be ignored. elif seg.ack_number < self.snd_una: + self.log.debug( 'Received duplicate ack, ignoring' ) pass # If the ACK acks # something not yet sent (SEG.ACK > SND.NXT) then send an ACK, # drop the segment, and return. elif seg.ack_number > self.snd_nxt: + self.log.debug( 'Received ACK for something not yet sent, sending ACK and dropping the segment' ) self.sendAck() return None @@ -1600,16 +1692,19 @@ # the 2 MSL timeout. if seg.fin: + self.log.debug( 'Acknowledging remote FIN with ACK' ) self.sendAck() - # TODO TODO TODO - # Implement TIme-Wait timeout + self.log.debug( 'Starting 2msl %s timeout' % TIME_WAIT ) + self.restartTimeWaitTimeout( 2 * self.msl ) + # sixth, check the URG bit pass # Not supported # seventh, process the segment text if seg.data is not None: + self.log.debug( 'Segment contains data' ) # ESTABLISHED STATE # FIN-WAIT-1 STATE # FIN-WAIT-2 STATE @@ -1618,11 +1713,17 @@ # Once in the ESTABLISHED state, it is possible to deliver segment # text to user RECEIVE buffers. Text from segments can be moved # into buffers until either the buffer is full or the segment is - # empty. - bytes = seg.data.chain().bytes + # empty. + # byteArray = [byte for byte in bytes[]] + + if seg.sequence == self.rcv_nxt: + self.inboundSequences += [byte for byte in seg.data.chain().bytes] + #self.log.debug( 'Setting inboundSequences[%i:%i] (%s) to recvd bytes' % + # ( seg.sequence, seg.sequence + len( byteArray ), self.inboundSequences ) ) - self.inboundSequences[seg.sequence:seg.sequence + len( bytes )] = \ - [sequenced( byte ) for byte in bytes] + # self.inboundSequences[seg.sequence:( seg.sequence + len( byteArray ) )] = byteArray + self.log.debug( 'Length of inboundSequences is now %i' % len( self.inboundSequences ) ) + # [Sequenced( byte ) for byte in bytes] # Don't really need to use the Sequenced() for raw data. #If the segment empties and carries an PUSH flag, then #the user is informed, when the buffer is returned, that a PUSH @@ -1675,7 +1776,8 @@ # user. self.debugWithState( "connection closing" ) - self.inboundSequences += [sequenced( fin = seg.fin, syn = seg.syn, ack = seg.ack )] + self.inboundSequences += 1 + # self.inboundSequences += [Sequenced( fin = seg.fin, syn = seg.syn, ack = seg.ack )] # self.rcv_nxt = seg.sequence + 1 # Done automatically by above self.debugWithState( "sending ACK of FIN" ) @@ -1725,8 +1827,11 @@ # And return return seg - self.log.error( 'This point should never be reached!' ) - return None + # If we get here, the packet was just an ACK packet that may or + # may not have had data. Otherwise, something interesting happened. + if not seg.ack or ( seg.syn or seg.fin or seg.reset ): + self.log.error( "Should never get here. Something weird happened." ) + return seg def turnOffTimers( self ): ''' @@ -1740,72 +1845,60 @@ self.stopTimeWaitTimeout() def stopUserTimeout( self ): + ''' + Stops the User Timeout. + ''' if self.userTimer is not None: self.userTimer.cancel() self.userTimer = None def restartUserTimeout( self, timeout = 10.0 ): + ''' + Stops, then starts the User Timeout. + ''' self.stopUserTimeout() self.userTimer = threading.Timer( timeout, TcpStateMachine.setState, args = ( self, CLOSED ) ) self.userTimer.start() def stopTimeWaitTimeout( self ): + ''' + Stops the TIME WAIT timeout. + ''' if self.timeWaitTimer is not None: self.timeWaitTimer.cancel() self.timeWaitTimer = None def restartTimeWaitTimeout( self, timeout = 10.0 ): + ''' + Stops, then starts the TIME WAIT timeout. + ''' self.stopTimeWaitTimeout() self.timeWaitTimer = threading.Timer( timeout, TcpStateMachine.setState, args = ( self, CLOSED ) ) self.timeWaitTimer.start() def forceResendUnackedData( self ): - self.resetRetransmitTimer() + ''' + Forces the Send thread to stop waiting and immediately send any perform + a send of pending data. + ''' + self.__sendThread.timeout.trigger() def resetRetransmitTimer( self ): + ''' + Resets the retransmit timer. + ''' self.__sendThread.timeout.reset() def debugWithState( self, x ): + ''' + Prints a DEBUG statement that includes the current TCP state. + ''' self.log.debug( "[%s] %s" % ( self.state, x ) ) - def __init__( self, localhost = ( testconfig.localIP, testconfig.localPort ), remotehost = ( testconfig.remoteIP, testconfig.remotePort ) ): + def unackedFins( self ): ''' - Initialize the TCP State Machine. The information for the local host and the - remote host must be provided as a 2 - tuple, a string and an integer. - - Example: - >>> t = TcpStateMachine( ( "127.0.0.1", 10000 ), ( "192.168.0.1", 80 ) ) + Returns a list of all FIN sequences in the outbound queue that have >>> TRUNCATED FOR MAIL (1000 lines) <<<
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200908170650.n7H6ovNw003066>