Norman James | 8b2b722 | 2015-10-08 07:01:38 -0500 | [diff] [blame] | 1 | """This module implements the TFTP Server functionality. Instantiate an |
| 2 | instance of the server, and then run the listen() method to listen for client |
| 3 | requests. Logging is performed via a standard logging object set in |
| 4 | TftpShared.""" |
| 5 | |
| 6 | import socket, os, time |
| 7 | import select |
| 8 | import threading |
| 9 | from TftpShared import * |
| 10 | from TftpPacketTypes import * |
| 11 | from TftpPacketFactory import TftpPacketFactory |
| 12 | from TftpContexts import TftpContextServer |
| 13 | |
| 14 | class TftpServer(TftpSession): |
| 15 | """This class implements a tftp server object. Run the listen() method to |
| 16 | listen for client requests. It takes two optional arguments. tftproot is |
| 17 | the path to the tftproot directory to serve files from and/or write them |
| 18 | to. dyn_file_func is a callable that must return a file-like object to |
| 19 | read from during downloads. This permits the serving of dynamic |
| 20 | content.""" |
| 21 | |
| 22 | def __init__(self, tftproot='/tftpboot', dyn_file_func=None): |
| 23 | self.listenip = None |
| 24 | self.listenport = None |
| 25 | self.sock = None |
| 26 | # FIXME: What about multiple roots? |
| 27 | self.root = os.path.abspath(tftproot) |
| 28 | self.dyn_file_func = dyn_file_func |
| 29 | # A dict of sessions, where each session is keyed by a string like |
| 30 | # ip:tid for the remote end. |
| 31 | self.sessions = {} |
| 32 | # A threading event to help threads synchronize with the server |
| 33 | # is_running state. |
| 34 | self.is_running = threading.Event() |
| 35 | |
| 36 | self.shutdown_gracefully = False |
| 37 | self.shutdown_immediately = False |
| 38 | |
| 39 | if self.dyn_file_func: |
| 40 | if not callable(self.dyn_file_func): |
| 41 | raise TftpException, "A dyn_file_func supplied, but it is not callable." |
| 42 | elif os.path.exists(self.root): |
| 43 | log.debug("tftproot %s does exist", self.root) |
| 44 | if not os.path.isdir(self.root): |
| 45 | raise TftpException, "The tftproot must be a directory." |
| 46 | else: |
| 47 | log.debug("tftproot %s is a directory", self.root) |
| 48 | if os.access(self.root, os.R_OK): |
| 49 | log.debug("tftproot %s is readable", self.root) |
| 50 | else: |
| 51 | raise TftpException, "The tftproot must be readable" |
| 52 | if os.access(self.root, os.W_OK): |
| 53 | log.debug("tftproot %s is writable", self.root) |
| 54 | else: |
| 55 | log.warning("The tftproot %s is not writable" % self.root) |
| 56 | else: |
| 57 | raise TftpException, "The tftproot does not exist." |
| 58 | |
| 59 | def listen(self, |
| 60 | listenip="", |
| 61 | listenport=DEF_TFTP_PORT, |
| 62 | timeout=SOCK_TIMEOUT): |
| 63 | """Start a server listening on the supplied interface and port. This |
| 64 | defaults to INADDR_ANY (all interfaces) and UDP port 69. You can also |
| 65 | supply a different socket timeout value, if desired.""" |
| 66 | tftp_factory = TftpPacketFactory() |
| 67 | |
| 68 | # Don't use new 2.5 ternary operator yet |
| 69 | # listenip = listenip if listenip else '0.0.0.0' |
| 70 | if not listenip: listenip = '0.0.0.0' |
| 71 | log.info("Server requested on ip %s, port %s" |
| 72 | % (listenip, listenport)) |
| 73 | try: |
| 74 | # FIXME - sockets should be non-blocking |
| 75 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 76 | self.sock.bind((listenip, listenport)) |
| 77 | _, self.listenport = self.sock.getsockname() |
| 78 | except socket.error, err: |
| 79 | # Reraise it for now. |
| 80 | raise |
| 81 | |
| 82 | self.is_running.set() |
| 83 | |
| 84 | log.info("Starting receive loop...") |
| 85 | while True: |
| 86 | log.debug("shutdown_immediately is %s", self.shutdown_immediately) |
| 87 | log.debug("shutdown_gracefully is %s", self.shutdown_gracefully) |
| 88 | if self.shutdown_immediately: |
| 89 | log.warn("Shutting down now. Session count: %d" % len(self.sessions)) |
| 90 | self.sock.close() |
| 91 | for key in self.sessions: |
| 92 | self.sessions[key].end() |
| 93 | self.sessions = [] |
| 94 | break |
| 95 | |
| 96 | elif self.shutdown_gracefully: |
| 97 | if not self.sessions: |
| 98 | log.warn("In graceful shutdown mode and all sessions complete.") |
| 99 | self.sock.close() |
| 100 | break |
| 101 | |
| 102 | # Build the inputlist array of sockets to select() on. |
| 103 | inputlist = [] |
| 104 | inputlist.append(self.sock) |
| 105 | for key in self.sessions: |
| 106 | inputlist.append(self.sessions[key].sock) |
| 107 | |
| 108 | # Block until some socket has input on it. |
| 109 | log.debug("Performing select on this inputlist: %s", inputlist) |
| 110 | readyinput, readyoutput, readyspecial = select.select(inputlist, |
| 111 | [], |
| 112 | [], |
| 113 | SOCK_TIMEOUT) |
| 114 | |
| 115 | deletion_list = [] |
| 116 | |
| 117 | # Handle the available data, if any. Maybe we timed-out. |
| 118 | for readysock in readyinput: |
| 119 | # Is the traffic on the main server socket? ie. new session? |
| 120 | if readysock == self.sock: |
| 121 | log.debug("Data ready on our main socket") |
| 122 | buffer, (raddress, rport) = self.sock.recvfrom(MAX_BLKSIZE) |
| 123 | |
| 124 | log.debug("Read %d bytes", len(buffer)) |
| 125 | |
| 126 | if self.shutdown_gracefully: |
| 127 | log.warn("Discarding data on main port, in graceful shutdown mode") |
| 128 | continue |
| 129 | |
| 130 | # Forge a session key based on the client's IP and port, |
| 131 | # which should safely work through NAT. |
| 132 | key = "%s:%s" % (raddress, rport) |
| 133 | |
| 134 | if not self.sessions.has_key(key): |
| 135 | log.debug("Creating new server context for " |
| 136 | "session key = %s", key) |
| 137 | self.sessions[key] = TftpContextServer(raddress, |
| 138 | rport, |
| 139 | timeout, |
| 140 | self.root, |
| 141 | self.dyn_file_func) |
| 142 | try: |
| 143 | self.sessions[key].start(buffer) |
| 144 | except TftpException, err: |
| 145 | deletion_list.append(key) |
| 146 | log.error("Fatal exception thrown from " |
| 147 | "session %s: %s" % (key, str(err))) |
| 148 | else: |
| 149 | log.warn("received traffic on main socket for " |
| 150 | "existing session??") |
| 151 | log.info("Currently handling these sessions:") |
| 152 | for session_key, session in self.sessions.items(): |
| 153 | log.info(" %s" % session) |
| 154 | |
| 155 | else: |
| 156 | # Must find the owner of this traffic. |
| 157 | for key in self.sessions: |
| 158 | if readysock == self.sessions[key].sock: |
| 159 | log.info("Matched input to session key %s" |
| 160 | % key) |
| 161 | try: |
| 162 | self.sessions[key].cycle() |
| 163 | if self.sessions[key].state == None: |
| 164 | log.info("Successful transfer.") |
| 165 | deletion_list.append(key) |
| 166 | except TftpException, err: |
| 167 | deletion_list.append(key) |
| 168 | log.error("Fatal exception thrown from " |
| 169 | "session %s: %s" |
| 170 | % (key, str(err))) |
| 171 | # Break out of for loop since we found the correct |
| 172 | # session. |
| 173 | break |
| 174 | |
| 175 | else: |
| 176 | log.error("Can't find the owner for this packet. " |
| 177 | "Discarding.") |
| 178 | |
| 179 | log.debug("Looping on all sessions to check for timeouts") |
| 180 | now = time.time() |
| 181 | for key in self.sessions: |
| 182 | try: |
| 183 | self.sessions[key].checkTimeout(now) |
| 184 | except TftpTimeout, err: |
| 185 | log.error(str(err)) |
| 186 | self.sessions[key].retry_count += 1 |
| 187 | if self.sessions[key].retry_count >= TIMEOUT_RETRIES: |
| 188 | log.debug("hit max retries on %s, giving up", |
| 189 | self.sessions[key]) |
| 190 | deletion_list.append(key) |
| 191 | else: |
| 192 | log.debug("resending on session %s", self.sessions[key]) |
| 193 | self.sessions[key].state.resendLast() |
| 194 | |
| 195 | log.debug("Iterating deletion list.") |
| 196 | for key in deletion_list: |
| 197 | log.info('') |
| 198 | log.info("Session %s complete" % key) |
| 199 | if self.sessions.has_key(key): |
| 200 | log.debug("Gathering up metrics from session before deleting") |
| 201 | self.sessions[key].end() |
| 202 | metrics = self.sessions[key].metrics |
| 203 | if metrics.duration == 0: |
| 204 | log.info("Duration too short, rate undetermined") |
| 205 | else: |
| 206 | log.info("Transferred %d bytes in %.2f seconds" |
| 207 | % (metrics.bytes, metrics.duration)) |
| 208 | log.info("Average rate: %.2f kbps" % metrics.kbps) |
| 209 | log.info("%.2f bytes in resent data" % metrics.resent_bytes) |
| 210 | log.info("%d duplicate packets" % metrics.dupcount) |
| 211 | log.debug("Deleting session %s", key) |
| 212 | del self.sessions[key] |
| 213 | log.debug("Session list is now %s", self.sessions) |
| 214 | else: |
| 215 | log.warn("Strange, session %s is not on the deletion list" |
| 216 | % key) |
| 217 | |
| 218 | self.is_running.clear() |
| 219 | |
| 220 | log.debug("server returning from while loop") |
| 221 | self.shutdown_gracefully = self.shutdown_immediately = False |
| 222 | |
| 223 | def stop(self, now=False): |
| 224 | """Stop the server gracefully. Do not take any new transfers, |
| 225 | but complete the existing ones. If force is True, drop everything |
| 226 | and stop. Note, immediately will not interrupt the select loop, it |
| 227 | will happen when the server returns on ready data, or a timeout. |
| 228 | ie. SOCK_TIMEOUT""" |
| 229 | if now: |
| 230 | self.shutdown_immediately = True |
| 231 | else: |
| 232 | self.shutdown_gracefully = True |