| """This module implements the TFTP Server functionality. Instantiate an |
| instance of the server, and then run the listen() method to listen for client |
| requests. Logging is performed via a standard logging object set in |
| TftpShared.""" |
| |
| import socket, os, time |
| import select |
| import threading |
| from TftpShared import * |
| from TftpPacketTypes import * |
| from TftpPacketFactory import TftpPacketFactory |
| from TftpContexts import TftpContextServer |
| |
| class TftpServer(TftpSession): |
| """This class implements a tftp server object. Run the listen() method to |
| listen for client requests. It takes two optional arguments. tftproot is |
| the path to the tftproot directory to serve files from and/or write them |
| to. dyn_file_func is a callable that must return a file-like object to |
| read from during downloads. This permits the serving of dynamic |
| content.""" |
| |
| def __init__(self, tftproot='/tftpboot', dyn_file_func=None): |
| self.listenip = None |
| self.listenport = None |
| self.sock = None |
| # FIXME: What about multiple roots? |
| self.root = os.path.abspath(tftproot) |
| self.dyn_file_func = dyn_file_func |
| # A dict of sessions, where each session is keyed by a string like |
| # ip:tid for the remote end. |
| self.sessions = {} |
| # A threading event to help threads synchronize with the server |
| # is_running state. |
| self.is_running = threading.Event() |
| |
| self.shutdown_gracefully = False |
| self.shutdown_immediately = False |
| |
| if self.dyn_file_func: |
| if not callable(self.dyn_file_func): |
| raise TftpException, "A dyn_file_func supplied, but it is not callable." |
| elif os.path.exists(self.root): |
| log.debug("tftproot %s does exist", self.root) |
| if not os.path.isdir(self.root): |
| raise TftpException, "The tftproot must be a directory." |
| else: |
| log.debug("tftproot %s is a directory", self.root) |
| if os.access(self.root, os.R_OK): |
| log.debug("tftproot %s is readable", self.root) |
| else: |
| raise TftpException, "The tftproot must be readable" |
| if os.access(self.root, os.W_OK): |
| log.debug("tftproot %s is writable", self.root) |
| else: |
| log.warning("The tftproot %s is not writable" % self.root) |
| else: |
| raise TftpException, "The tftproot does not exist." |
| |
| def listen(self, |
| listenip="", |
| listenport=DEF_TFTP_PORT, |
| timeout=SOCK_TIMEOUT): |
| """Start a server listening on the supplied interface and port. This |
| defaults to INADDR_ANY (all interfaces) and UDP port 69. You can also |
| supply a different socket timeout value, if desired.""" |
| tftp_factory = TftpPacketFactory() |
| |
| # Don't use new 2.5 ternary operator yet |
| # listenip = listenip if listenip else '0.0.0.0' |
| if not listenip: listenip = '0.0.0.0' |
| log.info("Server requested on ip %s, port %s" |
| % (listenip, listenport)) |
| try: |
| # FIXME - sockets should be non-blocking |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| self.sock.bind((listenip, listenport)) |
| _, self.listenport = self.sock.getsockname() |
| except socket.error, err: |
| # Reraise it for now. |
| raise |
| |
| self.is_running.set() |
| |
| log.info("Starting receive loop...") |
| while True: |
| log.debug("shutdown_immediately is %s", self.shutdown_immediately) |
| log.debug("shutdown_gracefully is %s", self.shutdown_gracefully) |
| if self.shutdown_immediately: |
| log.warn("Shutting down now. Session count: %d" % len(self.sessions)) |
| self.sock.close() |
| for key in self.sessions: |
| self.sessions[key].end() |
| self.sessions = [] |
| break |
| |
| elif self.shutdown_gracefully: |
| if not self.sessions: |
| log.warn("In graceful shutdown mode and all sessions complete.") |
| self.sock.close() |
| break |
| |
| # Build the inputlist array of sockets to select() on. |
| inputlist = [] |
| inputlist.append(self.sock) |
| for key in self.sessions: |
| inputlist.append(self.sessions[key].sock) |
| |
| # Block until some socket has input on it. |
| log.debug("Performing select on this inputlist: %s", inputlist) |
| readyinput, readyoutput, readyspecial = select.select(inputlist, |
| [], |
| [], |
| SOCK_TIMEOUT) |
| |
| deletion_list = [] |
| |
| # Handle the available data, if any. Maybe we timed-out. |
| for readysock in readyinput: |
| # Is the traffic on the main server socket? ie. new session? |
| if readysock == self.sock: |
| log.debug("Data ready on our main socket") |
| buffer, (raddress, rport) = self.sock.recvfrom(MAX_BLKSIZE) |
| |
| log.debug("Read %d bytes", len(buffer)) |
| |
| if self.shutdown_gracefully: |
| log.warn("Discarding data on main port, in graceful shutdown mode") |
| continue |
| |
| # Forge a session key based on the client's IP and port, |
| # which should safely work through NAT. |
| key = "%s:%s" % (raddress, rport) |
| |
| if not self.sessions.has_key(key): |
| log.debug("Creating new server context for " |
| "session key = %s", key) |
| self.sessions[key] = TftpContextServer(raddress, |
| rport, |
| timeout, |
| self.root, |
| self.dyn_file_func) |
| try: |
| self.sessions[key].start(buffer) |
| except TftpException, err: |
| deletion_list.append(key) |
| log.error("Fatal exception thrown from " |
| "session %s: %s" % (key, str(err))) |
| else: |
| log.warn("received traffic on main socket for " |
| "existing session??") |
| log.info("Currently handling these sessions:") |
| for session_key, session in self.sessions.items(): |
| log.info(" %s" % session) |
| |
| else: |
| # Must find the owner of this traffic. |
| for key in self.sessions: |
| if readysock == self.sessions[key].sock: |
| log.info("Matched input to session key %s" |
| % key) |
| try: |
| self.sessions[key].cycle() |
| if self.sessions[key].state == None: |
| log.info("Successful transfer.") |
| deletion_list.append(key) |
| except TftpException, err: |
| deletion_list.append(key) |
| log.error("Fatal exception thrown from " |
| "session %s: %s" |
| % (key, str(err))) |
| # Break out of for loop since we found the correct |
| # session. |
| break |
| |
| else: |
| log.error("Can't find the owner for this packet. " |
| "Discarding.") |
| |
| log.debug("Looping on all sessions to check for timeouts") |
| now = time.time() |
| for key in self.sessions: |
| try: |
| self.sessions[key].checkTimeout(now) |
| except TftpTimeout, err: |
| log.error(str(err)) |
| self.sessions[key].retry_count += 1 |
| if self.sessions[key].retry_count >= TIMEOUT_RETRIES: |
| log.debug("hit max retries on %s, giving up", |
| self.sessions[key]) |
| deletion_list.append(key) |
| else: |
| log.debug("resending on session %s", self.sessions[key]) |
| self.sessions[key].state.resendLast() |
| |
| log.debug("Iterating deletion list.") |
| for key in deletion_list: |
| log.info('') |
| log.info("Session %s complete" % key) |
| if self.sessions.has_key(key): |
| log.debug("Gathering up metrics from session before deleting") |
| self.sessions[key].end() |
| metrics = self.sessions[key].metrics |
| if metrics.duration == 0: |
| log.info("Duration too short, rate undetermined") |
| else: |
| log.info("Transferred %d bytes in %.2f seconds" |
| % (metrics.bytes, metrics.duration)) |
| log.info("Average rate: %.2f kbps" % metrics.kbps) |
| log.info("%.2f bytes in resent data" % metrics.resent_bytes) |
| log.info("%d duplicate packets" % metrics.dupcount) |
| log.debug("Deleting session %s", key) |
| del self.sessions[key] |
| log.debug("Session list is now %s", self.sessions) |
| else: |
| log.warn("Strange, session %s is not on the deletion list" |
| % key) |
| |
| self.is_running.clear() |
| |
| log.debug("server returning from while loop") |
| self.shutdown_gracefully = self.shutdown_immediately = False |
| |
| def stop(self, now=False): |
| """Stop the server gracefully. Do not take any new transfers, |
| but complete the existing ones. If force is True, drop everything |
| and stop. Note, immediately will not interrupt the select loop, it |
| will happen when the server returns on ready data, or a timeout. |
| ie. SOCK_TIMEOUT""" |
| if now: |
| self.shutdown_immediately = True |
| else: |
| self.shutdown_gracefully = True |