Skip site navigation (1)Skip section navigation (2)
Date:      Mon, 17 Aug 2009 06:50:57 GMT
From:      Zachariah Riggle <zjriggl@FreeBSD.org>
To:        Perforce Change Reviews <perforce@FreeBSD.org>
Subject:   PERFORCE change 167424 for review
Message-ID:  <200908170650.n7H6ovNw003066@repoman.freebsd.org>

next in thread | raw e-mail | index | archive | help
http://perforce.freebsd.org/chv.cgi?CH=167424

Change 167424 by zjriggl@zjriggl_tcpregression on 2009/08/17 06:50:01

	Final commit for SoC.  Everything should work after this.  I'm gonna download it all into my FreeBSD VM and make sure the tests run.

Affected files ...

.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/CHANGES#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/LICENSE#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/Makefile#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/README#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap.pyx#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap_ex.c#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap_ex.h#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/setup.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/__init__.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/packetFilterTest.pcap#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/test.pcap#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/testPcap.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/testsniff.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/echoServer.py#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcs/pcap/CHANGES#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcs/pcap/README#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/StringField.py#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/__init__.py#10 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/backup.tar#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/hwAddress.py#5 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/hwaddress.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/ipAddress.py#7 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/ipaddress.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/networkPort.py#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/networkport.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/payload.py#5 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/pseudoipv4.py#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/segmentBuffer.py#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/segmentbuffer.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sequence.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sniffLocalhost.py#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpConstructor.py#7 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpFilter.py#8 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpconstructor.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpfilter.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpstatemachine.py#10 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/test.html#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/test.py#2 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/__init__.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/echoclient.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/echoserver.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/logging.conf#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/segmentBufferTest.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/tcpFilterTest.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/test.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testSequence.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testStates.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpConstructor.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpHandshake.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpStateMachine.py#1 add

Differences ...

==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/__init__.py#10 (text+ko) ====

@@ -4,6 +4,7 @@
 import socket
 import sys
 import re
+import binascii
 from pcs.packets.ipv4 import ipv4
 from pcs.packets.tcp import tcp
 from pcs.packets.tcpv6 import tcpv6
@@ -37,4 +38,3 @@
     x = re.sub( octet, r"\1   ", x )
     x = re.sub( twoBytes, r"\1\n", x )
     return x
-

==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sequence.py#2 (text+ko) ====

@@ -5,55 +5,69 @@
 '''
 from pcsextension import findTcpLayer
 
-def seq(x):
+def seq( x ):
     # It is essential to remember that the actual sequence number space is
     # finite, though very large.  This space ranges from 0 to 2**32 - 1.
     # Since the space is finite, all arithmetic dealing with sequence
     # numbers must be performed modulo 2**32.  This unsigned arithmetic
     # preserves the relationship of sequence numbers as they cycle from
     # 2**32 - 1 to 0 again.
-    return x % (2**32)
+    return x % ( 2 ** 32 )
 
-class sequenced(str):
+class Sequenced( str ):
     '''
     This class exists to encapsulate sequenced items in the TCP stream.  Each
     byte (octet) is assigned a sequence number, as is the original SYN and final
-    FIN. 
+    FIN.
+    
+    Raw data CAN be encapsulated in a Sequenced object, but doing so will use
+    additional memory and is not necessary.  The Sequenced class should only
+    be used to denote special sequences that have either the SYN or FIN bits
+    set (each of which can have the ACK bits set).
+    
+    For example, a data stream may look like the following...
+    
+    Client
+        [SYN]          datadatadatadata
+             [SYN\ACK]     
+    Server
     '''
     syn = False
     fin = False
     ack = False
-    
-    def __new__(cls, data='', syn=False, fin=False, ack=True):
-        # New has to be used, because the str type is immutable.
-        if len(data) == 0:
-            if syn:
-                data += "[SYN]"
-            if fin:
-                data += "[FIN]"
-            if ack:
-                data += "[ACK]"
-                
-        obj = super( sequenced, cls).__new__( sequenced, data or "" )
+
+    # New has to be used, because the str type is immutable.
+    def __new__( cls, syn = False, fin = False, ack = False ):
+
+        # Don't overwrite any user-provided data.
+        data = ''
+        if syn and not ack:
+            data += "[SYN]"
+        elif fin and not ack:
+            data += "[FIN]"
+        elif syn and ack:
+            data += "[SYN\ACK]"
+        elif fin and ack:
+            data += "[FIN\ACK]"
+
+        # Check for invalid flag combos.
+        if ack and not ( syn or fin ):
+            raise Warning, "The 'ack' bit should not be set by itself.  This is not " \
+            "the intended use of the class Sequenced."
+        if not ( syn or ack or fin ):
+            raise Warning, "Did not set any flags."
+
+        obj = super( Sequenced, cls ).__new__( Sequenced, data or "" )
+
+        # The below statements allow us to use True/False or 1/0 or any other
+        # expression that can be evaluated to a boolean.
         obj.syn = True if syn else False
         obj.ack = True if ack else False
         obj.fin = True if fin else False
-        return obj    
-    
-def makeSequenced(packet):
-    
-    t = findTcpLayer(packet)
-    
-    if t.syn or t.fin:
-        return [sequenced(syn=t.syn, ack=t.ack, fin=t.fin)]    
-    elif t.data is not None:
-        return [sequenced(byte,ack=t.ack) for byte in t.data]
-    
-    return []
-    
+        return obj
 
 # Special cases defined here for convenience...
-seq_syn = sequenced(syn=True,ack=False)
-seq_synack = sequenced(syn=True,ack=True)
-seq_fin = sequenced(fin=True,ack=False)
-seq_finack = sequenced(fin=True,ack=True)+seq_syn = Sequenced( syn = True, ack = False )
+seq_synack = Sequenced( syn = True, ack = True )
+seq_fin = Sequenced( fin = True, ack = False )
+seq_finack = Sequenced( fin = True, ack = True )

==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpstatemachine.py#10 (text+ko) ====

@@ -9,20 +9,20 @@
 from pcsextension import findTcpLayer
 from pcsextension.checksum import tcpChecksum
 from pcsextension.decorators import prop, validateTypes, uint16, uint32, synchronized
-from pcsextension.hwAddress import HwAddress
-from pcsextension.ipAddress import IpAddress
-from pcsextension.networkPort import NetworkPort
-from segmentBuffer import segmentBuffer
-from sequence import seq, sequenced, makeSequenced, seq_fin, seq_finack, seq_syn, seq_synack
-from tcpConstructor import tcpConstructor
-from tcpFilter import tcpFilter
+from pcsextension.hwaddress import HwAddress
+from pcsextension.ipaddress import IpAddress
+from pcsextension.networkport import NetworkPort
+from segmentbuffer import SegmentBuffer
+from sequence import seq, Sequenced, seq_fin, seq_finack, seq_syn, seq_synack
+from tcpconstructor import tcpConstructor
+from tcpfilter import TcpFilter
 from tcprecvdaemon import TcpRecvDaemon
 from tcpsenddaemon import TcpSendDaemon
 from tcpstates import CLOSE_WAIT, CLOSED, CLOSING, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, \
     LAST_ACK, LISTEN, SYN_RECEIVED, SYN_SENT, TIME_WAIT, TcpState, synchronizedStates, tcpStates
 from time import time
 from threading import RLock
-import pcap
+import pcs.pcap as pcap
 import pcs
 import testconfig
 import time
@@ -116,12 +116,59 @@
     3
     '''
 
-    __constructor = tcpConstructor()
-    __connector = tcpFilter( testconfig.interface )
+    __constructor = None
+    __tcpFilter = None
     __recvThread = None
     __sendThread = None
     lock = RLock()
+    sendLock = RLock()
+    recvLock = RLock()
+
+    def __init__( self, localhost = ( testconfig.localIP, testconfig.localPort ),
+                  remotehost = ( testconfig.remoteIP, testconfig.remotePort ) ):
+        '''
+        Initialize the TCP State Machine.  The information for the local host and the
+        remote host must be provided as a 2 - tuple, a string and an integer.
+
+        Example:
+        >>> t = TcpStateMachine( ( "127.0.0.1", 10000 ), ( "192.168.0.1", 80 ) )
+        '''
 
+        # Required objects.
+        # self.lock = RLock()
+        self.__constructor = tcpConstructor()
+        self.__tcpFilter = TcpFilter( testconfig.interface )
+        self._inboundSequences = SegmentBuffer()
+        self._outboundSequences = SegmentBuffer()
+
+        # Logger
+        self.log = tcplog( self )
+        self.log.debug( "(%s, %s)" % ( localhost, remotehost ) )
+
+        # Set IP and host
+        if validateTypes( {localhost: tuple} ) and len( localhost ) == 2:
+            self._localIP = IpAddress( localhost[0] )
+            self._localPort = NetworkPort( localhost[1] )
+            self.log.info( 'Local host: %s:%i' % ( self.localIP.getAscii(), self.localPort.getInteger() ) )
+
+        if validateTypes( {remotehost: tuple} ) and len( remotehost ) == 2:
+            self._remoteIP = IpAddress( remotehost[0] )
+            self._remotePort = NetworkPort( remotehost[1] )
+            self.log.info( 'Remote host: %s:%i' % ( self.remoteIP.getAscii(), self.remotePort.getInteger() ) )
+
+        # Set loopback mode
+        if self.remoteIP == self.localIP or \
+            testconfig.interface in self.loopbackInterfaces:
+
+            # If the user didn't specify a loopback interface, print a warning.
+            if testconfig.interface not in self.loopbackInterfaces:
+                self.log.warn( 'Automatically setting loopback mode to \'True\'' )
+
+            self.setLoopback( True )
+
+        # Reset the internal state.
+        self.reset()
+
     # @uint32
     @prop
     def snd_nxt():
@@ -202,7 +249,7 @@
         self.inboundSequences.base = irs
         self.inboundSequences = []
         del self.inboundSequences[:]
-        # self.inboundSequences = segmentBuffer([], irs, self.inboundSequences.limit)
+        # self.inboundSequences = SegmentBuffer([], irs, self.inboundSequences.limit)
         # self.inboundSequences.base = irs
 
     # @uint32
@@ -249,20 +296,29 @@
     _remoteEthernet = HwAddress( default = testconfig.remoteMAC )
 
     @prop
-    def localIP(): '''Local IP address.'''
-    _localIP = IpAddress( default = testconfig.localIP )
+    def localIP():
+        '''Local IP address.'''
+        return {'fset': lambda self, x: self.localIP.set( x ) }
 
     @prop
-    def remoteIP(): '''Remote IP address.'''
-    _remoteIP = IpAddress( default = testconfig.remoteIP )
+    def remoteIP():
+        '''Remote IP address.'''
+        return {'fset': lambda self, x: self.remoteIP.set( x ) }
 
     @prop
-    def localPort(): '''Local port.'''
-    _localPort = NetworkPort( default = testconfig.localPort )
+    def localPort():
+        '''Local port.'''
+        return {'fset': lambda self, x: self.localPort.set( x ) }
 
     @prop
-    def remotePort(): '''Remote port.'''
-    _remotePort = NetworkPort( default = testconfig.remotePort )
+    def remotePort():
+        '''Remote port.'''
+        return {'fset': lambda self, x: self.remotePort.set( x ) }
+
+    def setRemotePort( self, x ):
+        import traceback
+        traceback.print_stack()
+        self._remotePort = x
 
     @prop
     def userTimer(): ''' User timeout timer '''
@@ -274,21 +330,21 @@
 
     # Interface is actually a shortcut to the connector's interface field,
     # which is itself a property.  Setting a TcpStateMachine's interface will
-    # effectively trigger the tcpFilter object to switch interfaces to the 
+    # effectively trigger the TcpFilter object to switch interfaces to the 
     # specified interface.
     @prop
     def interface():
         '''Interface to use for sending/recving data'''
-        return {'fget': lambda self: self.__connector.interface,
-                'fset': lambda self, x: setattr( self.__connector, 'interface', x )}
+        return {'fget': lambda self: self.__tcpFilter.interface,
+                'fset': lambda self, x: setattr( self.__tcpFilter, 'interface', x )}
 
     def getConnector( self ):
         '''
         Retrieves the connector used to send and receive packets.
         Note: The name 'connector' is simply for consistency.  The object that is
-        returned is actually a tcpFilter object.
+        returned is actually a TcpFilter object.
         '''
-        return self.__connector
+        return self.__tcpFilter
 
     def setLoopback( self, lb = True ):
         '''
@@ -346,42 +402,12 @@
                 tcp.f_window: True,
                 tcp.f_urg_pointer: True }
 
-    @prop
-    def validate():
-        '''
-        Dictionary of fields to be validated.  Non - valid packets are dropped, non - valid
-        settings throw an error.  Accepted Values:
-        %s - Incoming packet TCP Checksum
-        %s - Incoming packet TCP Sequence Number
-        %s - Incoming packet TCP Ack number
-        %s - TCP State Machine transition
-        '''
-    _validate = { tcp.f_checksum: True,
-                tcp.f_sequence: True,
-                tcp.f_ack_number: True,
-                #tcp.f_sport: True,
-                #tcp.f_dport: True,
-                'transition': True }
-
     def generateISS( self ):
         '''
         Generates a new Initial Sequence Number (ISS).
         '''
         return seq( random.randint( 0, ( 1 << 32 ) - 1 ) )
 
-#    def _updateSegmentBuffer(self, newBuffer, oldBuffer):
-#        # If it's the same object, just return it.
-#        if newBuffer is oldBuffer:
-#            return segmentBuffer
-#        
-#        # If it is not of the correct type, make it into a segment buffer,
-#        # preserving the current base and max.
-#        if type(newBuffer) is not segmentBuffer:
-#            return segmentBuffer(newBuffer, base=oldBuffer.base, max=oldBuffer.max)
-#        
-#        # If it is the correct type, but is just a new buffer, assign it.
-#        return newBuffer
-
     @prop
     def outboundSequences():
         '''
@@ -399,7 +425,6 @@
         sequence number overflows 2**32.
         '''
         return {'fset': lambda self, x: self.outboundSequences.update( x ) }
-    _outboundSequences = segmentBuffer()
 
     @prop
     def inboundSequences():
@@ -408,7 +433,6 @@
         @see outboundSequences for more information.
         '''
         return { 'fset': lambda self, x: self.inboundSequences.update( x ) }
-    _inboundSequences = segmentBuffer()
 
     @prop
     def retransmissionQ():
@@ -424,11 +448,24 @@
         Note that this buffer will explicitly exclude all SYN and FIN sequences.
         @see inboundSequences
         '''
-        return {'fget':
-        lambda self:
-            [octet for octet in self.inboundSequences[seq( self.recvBufferOffset + self.irs ):] \
-             if type( octet ) != sequenced or not ( octet.syn or octet.ack ) ]
-        }
+        return {'fget': lambda self: [octet for octet in self.inboundSequences ] }
+
+        # Old version didn't work, because Sequenced objects would go into inboundSequences.
+        # This would, in turn, cause a single byte to be received twice once the buffer
+        # was incremented, because the number of bytes that were incremented would not
+        # take into account the SYN or SYN/ACK at the beginning of the buffer that did not
+        # get returned by recvBuffer
+            #[octet for octet in self.inboundSequences[seq( self.recvBufferOffset + self.irs ):] \
+            # if type( octet ) != Sequenced or not ( octet.syn or octet.ack ) ]
+
+
+    def getRecvBuffer( self ):
+        print "%i bytes in inboundSequences" % len( self.inboundSequences )
+        print "IRS: %i        Rcv Offset: %i" % ( self.irs, self.recvBufferOffset )
+        return [octet for octet in self.inboundSequences ]
+             # if type( octet ) != Sequenced or not ( octet.syn or octet.ack ) ]
+
+
 
     @prop
     def recvBufferOffset():
@@ -491,7 +528,6 @@
     def __str__( self ):
         '''
         Prints out the annotated status.
-
         '''
         statusNames = ['local socket',
           'foreign socket',
@@ -559,6 +595,8 @@
         Close the socket connection.  This is synonymous with the 'close' function used
         by normal UNIX sockets.
         '''
+        self.log.info( "Closing connection" )
+
         #CLOSED STATE (i.e., TCB does not exist)
         if self.state is CLOSED:
             #  If the user does not have access to such a connection, return
@@ -587,13 +625,13 @@
             #  then form a FIN segment and send it, and enter FIN-WAIT-1 state;
             #  otherwise queue for processing after entering ESTABLISHED state.
             if self.snd_una == self.snd_nxt:
-                self.outboundSequences += [seq_fin] #sequenced(fin=1)]
+                self.outboundSequences += [seq_fin] #Sequenced(fin=1)]
         #ESTABLISHED STATE
         elif self.state is ESTABLISHED:
             #  Queue this until all preceding SENDs have been segmentized, then
             #  form a FIN segment and send it.  In any case, enter FIN-WAIT-1
             #  state.
-            self.outboundSequences += [seq_fin]# sequenced(fin=1)]
+            self.outboundSequences += [seq_fin]# Sequenced(fin=1)]
             self.state = FIN_WAIT_1
 
         #FIN-WAIT-1 STATE
@@ -609,7 +647,7 @@
         elif self.state is CLOSE_WAIT:
             #  Queue this request until all preceding SENDs have been
             #  segmentized; then send a FIN segment, enter CLOSING state.
-            self.outboundSequences += [seq_fin] #sequenced(fin=1)]
+            self.outboundSequences += [seq_fin] #Sequenced(fin=1)]
 
         #CLOSING STATE
         #LAST-ACK STATE
@@ -658,7 +696,7 @@
             # snd_una is automatically handled
             # self.snd_una = self.iss
             self.state = SYN_SENT
-            self.outboundSequences = [seq_syn]# sequenced(syn=True)]
+            self.outboundSequences = [seq_syn]# Sequenced(syn=True)]
             self.__sendThread.timeout.trigger()
             # SNX_NXT is automatically incremented
 
@@ -721,11 +759,12 @@
 
         del self.outboundSequences[:]
         del self.inboundSequences[:]
+        self.recvBufferOffset = 0
         # self.recvBufferOffset = 0
 
         self.state = CLOSED
 
-    @synchronized( lock )
+#    @synchronized( sendLock )
     def sendPacket( self, packet ):
         '''
         Inform the TCP State Machine about packets that have been transmitted.
@@ -746,7 +785,7 @@
         # Check for a syn
         if seg.syn:
             self.iss = seg.sequence
-            self.outboundSequences += [sequenced( syn = seg.syn, ack = seg.ack )]
+            self.outboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack )]
 
             if self.state is CLOSED or ( self.state is LISTEN and not seg.ack ):
                 self.state = SYN_SENT
@@ -755,17 +794,21 @@
 
         # Check for other data
         if seg.data is not None:
-            self.outboundSequences += [sequenced( octet ) for octet in seg.data]
+            # It is unnecessary to use the Sequenced class on raw data.  It
+            # provides no extra utility, and simply causes excess memory usage.
+            # self.outboundSequences += [Sequenced( octet ) for octet in seg.data]
+            self.outboundSequences += [octet for octet in seg.data]
 
         # Check for a fin
         if seg.fin:
-            self.outboundSequences += [sequenced( fin = seg.fin, ack = seg.ack )]
+            self.outboundSequences += [Sequenced( fin = seg.fin, ack = seg.ack )]
 
             if self.state == ESTABLISHED:
                 self.state = FIN_WAIT_1
 
         self.__sendThread.timeout.trigger()
 
+#    @synchronized( recvLock )
     def recvRawTcp( self, block = True ):
         '''
         Receives a single packet off of the interface that is destined for
@@ -773,11 +816,12 @@
         
         @param block: Set to True to block until a packet is available.
         '''
-        rv = self.__connector.readFilteredByTuple( self.localIP, self.localPort, block )
+        rv = self.__tcpFilter.read()
         if rv is not None:
             return findTcpLayer( rv )
         return rv
 
+#    @synchronized( sendLock )
     def sendRawTcp( self, tcpLayer ):
         '''
         Packages the provided TCP packet and payload inside of the appropriate
@@ -788,17 +832,20 @@
         '''
         # self.log.debug( repr( tcpLayer ) )
 
+        self.log.debug( 'In sendRawTcp()' )
         if not validateTypes( {tcpLayer:tcp.tcp} ):
+            self.log.error( "tcpLayer was incorrect type: %s" % type( tcpLayer ) )
             return
+        # self.__socket.sendto(tcpLayer.chain().bytes, (self.remoteIP().getAscii(),self.remotePort().getAscii()))
 
-        # self.__socket.sendto(tcpLayer.chain().bytes, (self.remoteIP().getAscii(),self.remotePort().getAscii()))
         chain = self.__constructor.generateChain( tcpLayer )
+        self.log.debug( 'Built chain' )
         self.log.pktsent( repr ( tcpLayer.chain() ) )
-        self.__connector.write( chain.bytes )
+        self.__tcpFilter.write( chain.bytes )
 
         pass
 
-    @synchronized( lock )
+#    @synchronized( sendLock )
     def send( self, data, async = False ):
         '''
         Sends the specified data.
@@ -810,24 +857,28 @@
             Optionally send asynchronously.  Do not wait for acknowledgement.
         '''
         if not validateTypes( {data:str} ):
-            return - 1
+            self.log.error( 'Can only send strings' )
+            return ( -1 )
 
         if self.state == CLOSED:
             self.log.error( 'connection does not exist' )
-            return - 1
+            return ( -1 )
 
         if self.state == LISTEN:
+            self.log.info( 'attempted send() on LISTENing connection.'\
+                            '  performing active-open' )
             self.open()
 
         if self.state in ( SYN_RECEIVED, SYN_SENT ):
             # Queue the data for send if we are not in the connected state.
+            self.log.debug( 'Added %i bytes to the outboundSequences queue' % len( data ) )
             self.outboundSequences += [x for x in data]
 
         if self.state in ( ESTABLISHED, CLOSE_WAIT ):
 
             # Add the data to the outbound buffer.
             firstSeq = self.snd_nxt
-            self.outboundSequences += [sequenced( octet ) for octet in data]
+            self.outboundSequences += [octet for octet in data] # [Sequenced(octed) for octed in data]
             lastSeq = self.snd_nxt - 1
 
             # Break the data up into MSS-sized chunks
@@ -840,23 +891,29 @@
                 pkt = self.newPacket( {tcp.f_data: payload.payload( chunk ),
                                       tcp.f_sequence:  sequence } )
 
-                self.sendPacket( pkt )
+                self.sendRawTcp( pkt )
                 sequence += len( chunk )
 
+            # If packet auto-processing has been disabled and the call is
+            # blocking, this could lead to a deadlock.
+            if not async and not self.processPacketsOnArrival:
+                self.log.warn( 'Performed blocking send with processPacketsOnArrival disabled.' \
+                               '  This can lead to a deadlock situation because ACKs may not '  \
+                               'be received and processed.' )
+
             # This is a blocking call, we must wait until all of the data has
             # been acknowledged.
             if not async:
                 timer( 5.0, lambda: self.snd_una > lastSeq ).wait()
 
-            #timeout = timer(timeout=5)
-            #while (not async) and (self.snd_una <= lastSeq) and not timeout.expired:
-            #    time.sleep(0.01)
-
             return len( pkt.data )
 
         elif self.state in ( FIN_WAIT_1, FIN_WAIT_2, CLOSING, LAST_ACK, TIME_WAIT ):
             self.log.error( 'connection closing' )
             return - 1
+
+        self.__sendThread.timeout.trigger()
+
     def logGenerated( self, packet = None, fieldname = None ):
         '''
         Helper method to log generated field data.
@@ -982,7 +1039,7 @@
                      tcp.f_ack_number: self.rcv_nxt,
                      tcp.f_ack: 1}
 
-        print "Sending ack: {seq: %i, ack: %i, ack#: %i}" % ( self.snd_nxt, 1, self.rcv_nxt )
+        self.log.debug( "Sending ack: {seq: %i, ack: %i, ack#: %i}" % ( self.snd_nxt, 1, self.rcv_nxt ) )
 
         if fields is not None:
             ackFields.update( fields )
@@ -1032,8 +1089,8 @@
 
         return packet
 
-    @synchronized( lock )
-    def recv( self, numOctets = 4096, delay = 120 ):
+#    @synchronized( recvLock )
+    def recv( self, numOctets = 4096, timeout = 120, fillBuffer = False ):
         '''
         Returns *up to* numOctets octets of data.
         Returns the octets as a string.
@@ -1042,28 +1099,50 @@
         
         @param numOctets:  
             Maximum number of octets to get. (Default 4096)
-        @param delay:
+        @param timeout:
             Maximum amount of time, in seconds, to wait for data to be available.
             (Default 2 minutes) 
+        @param fillBuffer:
+            Attempt to fill the buffer within the timeout, instead of returning
+            immediately when data is available.  Return when the timeout expires
+            or when the numOctets octest have been read.
             
         @return: 
             Returns the octets as a string, or None if no octets were available
             before the operation timed out.
         '''
 
-        t = timer( delay )
-        while len( self.recvBuffer ) == 0 and not t.expired:
-            time.sleep( 0.05 )
+        t = timer( timeout )
+        octets = []
+
+        # Keep looping until [1] we have received data or [2] we have filled the 
+        # buffer, with the condition that looping will stop *immediately* when
+        # the timer expires.
+        while ( ( len( octets ) == 0 and len( octets ) < numOctets ) or \
+                ( fillBuffer and len( octets ) < numOctets ) )  and \
+               not t.expired:
+
+            # Are there any bytes?  Append them to octets.
+            if len( self.recvBuffer ) > 0:
+                # self.log.debug( 'Recv buffer has %i octets' % len( self.recvBuffer ) )
+                # What is our length now?  We need to know this for below.
+                lengthBefore = len( octets )
+
+                # Append the octets
+                octets += self.recvBuffer[:numOctets - len( octets )]
 
-        if len( self.recvBuffer ) > 0:
-            octets = self.recvBuffer[:numOctets]
-            self.recvBufferOffset += len( octets )
+                # Increment recvBufferOffset by the number of octets received
+                # during just this one pass.
+                self.recvBufferOffset += ( len( octets ) - lengthBefore )
+            else:
+                time.sleep( 0.01 )
 
+        if len( octets ) > 0:
             return ''.join( octets )
-        else:
-            return None
+
+        return None
 
-    @synchronized( lock )
+#    @synchronized( recvLock )
     def recvPacket( self, packet = None, timeout = None ):
         '''
         Get the next packet that has been received.  Packets are guaranteed
@@ -1080,7 +1159,7 @@
         self.log.error( "Use segmentArrives(packet)" )
         raise DeprecationWarning
 
-    @synchronized( lock )
+#    @synchronized( recvLock )
     def segmentArrives( self, packet = None ):
         '''
         Inform the TCP State Machine about packets that have been received.
@@ -1092,7 +1171,7 @@
         # Make sure that there is TCP data in the packet.
         seg = findTcpLayer( packet )
 
-        self.log.info( "Segment arrived!" )
+        self.log.debug( "Segment arrived" )
 
         if seg is None:
             self.log.warn( 'Could not find TCP layer in packet' )
@@ -1164,18 +1243,18 @@
                 #  <SEQ=SEG.ACK><CTL=RST>
                 pass # We don't check security/compartment
                 self.debugWithState( "Received SYN" )
+
                 # Set RCV.NXT to SEG.SEQ+1, IRS is set to SEG.SEQ and any other
                 # control or text should be queued for processing later.  ISS
                 # should be selected and a SYN segment sent of the form:
                 #  <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK>
                 self.irs = seg.sequence
-                # self.rcv_nxt = seg.sequence + 1 # Done automatically by property
                 self.iss = self.generateISS()
-                # self.snd_nxt = self.iss # Done automatically by property
-                self.inboundSequences += [sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # sequenced(syn=1)]
+                self.inboundSequences += 1
+                # self.inboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )]
 
                 # pkt = self.newPacket( {tcp.f_sequence:self.iss, tcp.f_ack_number: self.rcv_nxt, tcp.f_ack:1, tcp.f_syn: 1} )
-                self.outboundSequences += [seq_synack]#sequenced(syn=1)]
+                self.outboundSequences += [seq_synack]#Sequenced(syn=1)]
                 pkt = self.newPacket( {tcp.f_syn:1, tcp.f_ack:1, tcp.f_ack_number: seg.sequence} )
                 self.sendPacket( pkt )
 
@@ -1267,7 +1346,8 @@
                 # SEG.SEQ.  
                 # self.rcv_nxt = seg.sequence + 1 # Done automagically
                 self.irs = seg.sequence
-                self.inboundSequences += [sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # [sequenced(syn=1)]
+                self.inboundSequences += 1
+                # self.inboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # [Sequenced(syn=1)]
 
                 # SND.UNA should be advanced to equal SEG.ACK (if there
                 # is an ACK), and any segments on the retransmission queue which
@@ -1363,6 +1443,7 @@
             if seg.data is not None and self.rcv_wnd == 0:
                 acceptable = False # for good measure
 
+
             if seg.data is not None and self.rcv_wnd > 0 and \
                 ( ( self.rcv_nxt <= seg.sequence < ( self.rcv_nxt + self.rcv_wnd ) ) or
                  ( self.rcv_nxt <= ( seg.sequence + len( seg.data ) - 1 ) < ( self.rcv_nxt + self.rcv_wnd ) ) ):
@@ -1374,6 +1455,7 @@
             #
             #   <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
             if not acceptable:
+                self.log.debug( "Ack is unacceptable!" )
                 if seg.reset:
                     return None
 
@@ -1382,7 +1464,8 @@
                 # After sending the acknowledgment, drop the unacceptable segment
                 # and return.
                 return None
-
+            else:
+                self.log.debug( "Processing acceptable ACK" )
             # In the following it is assumed that the segment is the idealized
             # segment that begins at RCV.NXT and does not exceed the window.
             # One could tailor actual segments to fit this assumption by
@@ -1400,6 +1483,7 @@
                 # came from the LISTEN state), then return this connection to
                 # LISTEN state and return.  The user need not be informed.  
                 if self._lastState == LISTEN:
+                    self.log.debug( "Received RST, restoring connection to %s state." % LISTEN )
                     self.state = LISTEN
                     return seg
 
@@ -1416,6 +1500,7 @@
                 # And in the active OPEN case, enter the CLOSED state and delete the TCB,
                 # and return.
                 if self._lastState == SYN_SENT:
+                    self.log.debug( "Received RST packet, resetting connection state" )
                     self.reset()
                     return seg
 
@@ -1429,6 +1514,7 @@
                 # flushed.  Users should also receive an unsolicited general
                 # "connection reset" signal.  Enter the CLOSED state, delete the
                 # TCB, and return.
+                self.log.debug( "Received RST packet, resetting connection state" )
                 self.reset()
                 return seg
 
@@ -1438,6 +1524,7 @@
             if self.state in ( CLOSING, LAST_ACK, TIME_WAIT ):
                 # If the RST bit is set then, enter the CLOSED state, delete the
                 # TCB, and return.
+                self.log.debug( "Received RST packet, resetting connection state" )
                 self.reset()
                 return seg
 
@@ -1465,6 +1552,7 @@
                 # If the SYN is not in the window this step would not be reached
                 # and an ack would have been sent in the first step (sequence
                 # number check).
+                self.log.debug( "Received out-of-place SYN, sending RST and resetting connection state" )
                 self.sendRst( seg )
                 self.reset()
                 return seg
@@ -1472,6 +1560,7 @@
         # fifth check the ACK field,        
         # if the ACK bit is off drop the segment and return
         if not seg.ack:
+            self.log.debug( "ACK bit was not set, dropping segment" )
             return None
 
         # if the ACK bit is on
@@ -1489,6 +1578,7 @@
                 #   <SEQ=SEG.ACK><CTL=RST>
                 # and send it.
                 if not acceptable:
+                    self.log.debug( "ACK is not acceptable, sending RST" )
                     self.sendRst( seg )
                     # TODO TODO TODO
                     # Might have to return here.  RFC doesn't say to.
@@ -1511,12 +1601,14 @@
 
                 # If the ACK is a duplicate (SEG.ACK < SND.UNA), it can be ignored.  
                 elif seg.ack_number < self.snd_una:
+                    self.log.debug( 'Received duplicate ack, ignoring' )
                     pass
 
                 # If the ACK acks
                 # something not yet sent (SEG.ACK > SND.NXT) then send an ACK,
                 # drop the segment, and return.
                 elif seg.ack_number > self.snd_nxt:
+                    self.log.debug( 'Received ACK for something not yet sent, sending ACK and dropping the segment' )
                     self.sendAck()
                     return None
 
@@ -1600,16 +1692,19 @@
                 # the 2 MSL timeout.
 
                 if seg.fin:
+                    self.log.debug( 'Acknowledging remote FIN with ACK' )
                     self.sendAck()
 
-                # TODO TODO TODO
-                # Implement TIme-Wait timeout
+                self.log.debug( 'Starting 2msl %s timeout' % TIME_WAIT )
+                self.restartTimeWaitTimeout( 2 * self.msl )
+
 
         # sixth, check the URG bit
         pass # Not supported
 
         # seventh, process the segment text
         if seg.data is not None:
+            self.log.debug( 'Segment contains data' )
             # ESTABLISHED STATE
             # FIN-WAIT-1 STATE
             # FIN-WAIT-2 STATE
@@ -1618,11 +1713,17 @@
                 # Once in the ESTABLISHED state, it is possible to deliver segment
                 # text to user RECEIVE buffers.  Text from segments can be moved
                 # into buffers until either the buffer is full or the segment is
-                # empty.                    
-                bytes = seg.data.chain().bytes
+                # empty.
+                # byteArray = [byte for byte in bytes[]]
+
+                if seg.sequence == self.rcv_nxt:
+                    self.inboundSequences += [byte for byte in seg.data.chain().bytes]
+                #self.log.debug( 'Setting inboundSequences[%i:%i] (%s) to recvd bytes' %
+                #        ( seg.sequence, seg.sequence + len( byteArray ), self.inboundSequences ) )
 
-                self.inboundSequences[seg.sequence:seg.sequence + len( bytes )] = \
-                   [sequenced( byte ) for byte in bytes]
+                # self.inboundSequences[seg.sequence:( seg.sequence + len( byteArray ) )] = byteArray
+                self.log.debug( 'Length of inboundSequences is now %i' % len( self.inboundSequences ) )
+                   # [Sequenced( byte ) for byte in bytes] # Don't really need to use the Sequenced() for raw data.
 
                 #If the segment empties and carries an PUSH flag, then
                 #the user is informed, when the buffer is returned, that a PUSH
@@ -1675,7 +1776,8 @@
             # user.
             self.debugWithState( "connection closing" )
 
-            self.inboundSequences += [sequenced( fin = seg.fin, syn = seg.syn, ack = seg.ack )]
+            self.inboundSequences += 1
+            # self.inboundSequences += [Sequenced( fin = seg.fin, syn = seg.syn, ack = seg.ack )]
             # self.rcv_nxt = seg.sequence + 1 # Done automatically by above
 
             self.debugWithState( "sending ACK of FIN" )
@@ -1725,8 +1827,11 @@
             # And return
             return seg
 
-        self.log.error( 'This point should never be reached!' )
-        return None
+        # If we get here, the packet was just an ACK packet that may or
+        # may not have had data.  Otherwise, something interesting happened.
+        if not seg.ack or ( seg.syn or seg.fin or seg.reset ):
+            self.log.error( "Should never get here.  Something weird happened." )
+        return seg
 
     def turnOffTimers( self ):
         '''
@@ -1740,72 +1845,60 @@
         self.stopTimeWaitTimeout()
 
     def stopUserTimeout( self ):
+        '''
+        Stops the User Timeout.
+        '''
         if self.userTimer is not None:
             self.userTimer.cancel()
             self.userTimer = None
 
     def restartUserTimeout( self, timeout = 10.0 ):
+        '''
+        Stops, then starts the User Timeout.
+        '''
         self.stopUserTimeout()
         self.userTimer = threading.Timer( timeout, TcpStateMachine.setState, args = ( self, CLOSED ) )
         self.userTimer.start()
 
     def stopTimeWaitTimeout( self ):
+        '''
+        Stops the TIME WAIT timeout.
+        '''
         if self.timeWaitTimer is not None:
             self.timeWaitTimer.cancel()
             self.timeWaitTimer = None
 
     def restartTimeWaitTimeout( self, timeout = 10.0 ):
+        '''
+        Stops, then starts the TIME WAIT timeout.
+        '''
         self.stopTimeWaitTimeout()
         self.timeWaitTimer = threading.Timer( timeout, TcpStateMachine.setState, args = ( self, CLOSED ) )
         self.timeWaitTimer.start()
 
     def forceResendUnackedData( self ):
-        self.resetRetransmitTimer()
+        '''
+        Forces the Send thread to stop waiting and immediately send any perform
+        a send of pending data.
+        '''
+        self.__sendThread.timeout.trigger()
 
     def resetRetransmitTimer( self ):
+        '''
+        Resets the retransmit timer.
+        '''
         self.__sendThread.timeout.reset()
 
     def debugWithState( self, x ):
+        '''
+        Prints a DEBUG statement that includes the current TCP state.
+        '''
         self.log.debug( "[%s] %s" % ( self.state, x ) )
 
-    def __init__( self, localhost = ( testconfig.localIP, testconfig.localPort ), remotehost = ( testconfig.remoteIP, testconfig.remotePort ) ):
+    def unackedFins( self ):
         '''
-        Initialize the TCP State Machine.  The information for the local host and the
-        remote host must be provided as a 2 - tuple, a string and an integer.
-
-        Example:
-        >>> t = TcpStateMachine( ( "127.0.0.1", 10000 ), ( "192.168.0.1", 80 ) )
+        Returns a list of all FIN sequences in the outbound queue that have

>>> TRUNCATED FOR MAIL (1000 lines) <<<



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200908170650.n7H6ovNw003066>