Yocto 2.4

Move OpenBMC to Yocto 2.4(rocko)

Tested: Built and verified Witherspoon and Palmetto images
Change-Id: I12057b18610d6fb0e6903c60213690301e9b0c67
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
diff --git a/import-layers/yocto-poky/bitbake/lib/bb/server/process.py b/import-layers/yocto-poky/bitbake/lib/bb/server/process.py
index c3c1450..3d31355 100644
--- a/import-layers/yocto-poky/bitbake/lib/bb/server/process.py
+++ b/import-layers/yocto-poky/bitbake/lib/bb/server/process.py
@@ -22,125 +22,245 @@
 
 import bb
 import bb.event
-import itertools
 import logging
 import multiprocessing
+import threading
+import array
 import os
-import signal
 import sys
 import time
 import select
-from queue import Empty
-from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager
-
-from . import BitBakeBaseServer, BitBakeBaseServerConnection, BaseImplServer
+import socket
+import subprocess
+import errno
+import re
+import datetime
+import bb.server.xmlrpcserver
+from bb import daemonize
+from multiprocessing import queues
 
 logger = logging.getLogger('BitBake')
 
-class ServerCommunicator():
-    def __init__(self, connection, event_handle, server):
-        self.connection = connection
-        self.event_handle = event_handle
-        self.server = server
+class ProcessTimeout(SystemExit):
+    pass
 
-    def runCommand(self, command):
-        # @todo try/except
-        self.connection.send(command)
-
-        if not self.server.is_alive():
-            raise SystemExit
-
-        while True:
-            # don't let the user ctrl-c while we're waiting for a response
-            try:
-                for idx in range(0,4): # 0, 1, 2, 3
-                    if self.connection.poll(5):
-                        return self.connection.recv()
-                    else:
-                        bb.warn("Timeout while attempting to communicate with bitbake server")
-                bb.fatal("Gave up; Too many tries: timeout while attempting to communicate with bitbake server")
-            except KeyboardInterrupt:
-                pass
-
-    def getEventHandle(self):
-        return self.event_handle.value
-
-class EventAdapter():
-    """
-    Adapter to wrap our event queue since the caller (bb.event) expects to
-    call a send() method, but our actual queue only has put()
-    """
-    def __init__(self, queue):
-        self.queue = queue
-
-    def send(self, event):
-        try:
-            self.queue.put(event)
-        except Exception as err:
-            print("EventAdapter puked: %s" % str(err))
-
-
-class ProcessServer(Process, BaseImplServer):
+class ProcessServer(multiprocessing.Process):
     profile_filename = "profile.log"
     profile_processed_filename = "profile.log.processed"
 
-    def __init__(self, command_channel, event_queue, featurelist):
-        BaseImplServer.__init__(self)
-        Process.__init__(self)
-        self.command_channel = command_channel
-        self.event_queue = event_queue
-        self.event = EventAdapter(event_queue)
-        self.featurelist = featurelist
+    def __init__(self, lock, sock, sockname):
+        multiprocessing.Process.__init__(self)
+        self.command_channel = False
+        self.command_channel_reply = False
         self.quit = False
         self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
         self.next_heartbeat = time.time()
 
-        self.quitin, self.quitout = Pipe()
-        self.event_handle = multiprocessing.Value("i")
+        self.event_handle = None
+        self.haveui = False
+        self.lastui = False
+        self.xmlrpc = False
+
+        self._idlefuns = {}
+
+        self.bitbake_lock = lock
+        self.sock = sock
+        self.sockname = sockname
+
+    def register_idle_function(self, function, data):
+        """Register a function to be called while the server is idle"""
+        assert hasattr(function, '__call__')
+        self._idlefuns[function] = data
 
     def run(self):
-        for event in bb.event.ui_queue:
-            self.event_queue.put(event)
-        self.event_handle.value = bb.event.register_UIHhandler(self, True)
+
+        if self.xmlrpcinterface[0]:
+            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
+
+            print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
 
         heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
         if heartbeat_event:
             try:
                 self.heartbeat_seconds = float(heartbeat_event)
             except:
-                # Throwing an exception here causes bitbake to hang.
-                # Just warn about the invalid setting and continue
                 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
-        bb.cooker.server_main(self.cooker, self.main)
+
+        self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
+        try:
+            if self.timeout:
+                self.timeout = float(self.timeout)
+        except:
+            bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
+
+
+        try:
+            self.bitbake_lock.seek(0)
+            self.bitbake_lock.truncate()
+            if self.xmlrpc:
+                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
+            else:
+                self.bitbake_lock.write("%s\n" % (os.getpid()))
+            self.bitbake_lock.flush()
+        except Exception as e:
+            print("Error writing to lock file: %s" % str(e))
+            pass
+
+        if self.cooker.configuration.profile:
+            try:
+                import cProfile as profile
+            except:
+                import profile
+            prof = profile.Profile()
+
+            ret = profile.Profile.runcall(prof, self.main)
+
+            prof.dump_stats("profile.log")
+            bb.utils.process_profilelog("profile.log")
+            print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
+
+        else:
+            ret = self.main()
+
+        return ret
 
     def main(self):
-        # Ignore SIGINT within the server, as all SIGINT handling is done by
-        # the UI and communicated to us
-        self.quitin.close()
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
+        self.cooker.pre_serve()
+
         bb.utils.set_process_name("Cooker")
+
+        ready = []
+
+        self.controllersock = False
+        fds = [self.sock]
+        if self.xmlrpc:
+            fds.append(self.xmlrpc)
+        print("Entering server connection loop")
+
+        def disconnect_client(self, fds):
+            if not self.haveui:
+                return
+            print("Disconnecting Client")
+            fds.remove(self.controllersock)
+            fds.remove(self.command_channel)
+            bb.event.unregister_UIHhandler(self.event_handle, True)
+            self.command_channel_reply.writer.close()
+            self.event_writer.writer.close()
+            del self.event_writer
+            self.controllersock.close()
+            self.controllersock = False
+            self.haveui = False
+            self.lastui = time.time()
+            self.cooker.clientComplete()
+            if self.timeout is None:
+                print("No timeout, exiting.")
+                self.quit = True
+
         while not self.quit:
-            try:
-                if self.command_channel.poll():
-                    command = self.command_channel.recv()
-                    self.runCommand(command)
-                if self.quitout.poll():
-                    self.quitout.recv()
+            if self.sock in ready:
+                self.controllersock, address = self.sock.accept()
+                if self.haveui:
+                    print("Dropping connection attempt as we have a UI %s" % (str(ready)))
+                    self.controllersock.close()
+                else:
+                    print("Accepting %s" % (str(ready)))
+                    fds.append(self.controllersock)
+            if self.controllersock in ready:
+                try:
+                    print("Connecting Client")
+                    ui_fds = recvfds(self.controllersock, 3)
+
+                    # Where to write events to
+                    writer = ConnectionWriter(ui_fds[0])
+                    self.event_handle = bb.event.register_UIHhandler(writer, True)
+                    self.event_writer = writer
+
+                    # Where to read commands from
+                    reader = ConnectionReader(ui_fds[1])
+                    fds.append(reader)
+                    self.command_channel = reader
+
+                    # Where to send command return values to
+                    writer = ConnectionWriter(ui_fds[2])
+                    self.command_channel_reply = writer
+
+                    self.haveui = True
+
+                except (EOFError, OSError):
+                    disconnect_client(self, fds)
+
+            if not self.timeout == -1.0 and not self.haveui and self.lastui and self.timeout and \
+                    (self.lastui + self.timeout) < time.time():
+                print("Server timeout, exiting.")
+                self.quit = True
+
+            if self.command_channel in ready:
+                try:
+                    command = self.command_channel.get()
+                except EOFError:
+                    # Client connection shutting down
+                    ready = []
+                    disconnect_client(self, fds)
+                    continue
+                if command[0] == "terminateServer":
                     self.quit = True
+                    continue
+                try:
+                    print("Running command %s" % command)
+                    self.command_channel_reply.send(self.cooker.command.runCommand(command))
+                except Exception as e:
+                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
+
+            if self.xmlrpc in ready:
+                self.xmlrpc.handle_requests()
+
+            ready = self.idle_commands(.1, fds)
+
+        print("Exiting")
+        # Remove the socket file so we don't get any more connections to avoid races
+        os.unlink(self.sockname)
+        self.sock.close()
+
+        try: 
+            self.cooker.shutdown(True)
+        except:
+            pass
+
+        self.cooker.post_serve()
+
+        # Finally release the lockfile but warn about other processes holding it open
+        lock = self.bitbake_lock
+        lockfile = lock.name
+        lock.close()
+        lock = None
+
+        while not lock:
+            with bb.utils.timeout(3):
+                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
+                if not lock:
+                    # Some systems may not have lsof available
+                    procs = None
                     try:
-                        self.runCommand(["stateForceShutdown"])
-                    except:
-                        pass
+                        procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
+                    except OSError as e:
+                        if e.errno != errno.ENOENT:
+                            raise
+                    if procs is None:
+                        # Fall back to fuser if lsof is unavailable
+                        try:
+                            procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
+                        except OSError as e:
+                            if e.errno != errno.ENOENT:
+                                raise
 
-                self.idle_commands(.1, [self.command_channel, self.quitout])
-            except Exception:
-                logger.exception('Running command %s', command)
-
-        self.event_queue.close()
-        bb.event.unregister_UIHhandler(self.event_handle.value)
-        self.command_channel.close()
-        self.cooker.shutdown(True)
-        self.quitout.close()
+                    msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
+                    if procs:
+                        msg += ":\n%s" % str(procs)
+                    print(msg)
+                    return
+        # We hold the lock so we can remove the file (hide stale pid data)
+        bb.utils.remove(lockfile)
+        bb.utils.unlockfile(lock)
 
     def idle_commands(self, delay, fds=None):
         nextsleep = delay
@@ -186,109 +306,317 @@
             nextsleep = self.next_heartbeat - now
 
         if nextsleep is not None:
-            select.select(fds,[],[],nextsleep)
+            if self.xmlrpc:
+                nextsleep = self.xmlrpc.get_timeout(nextsleep)
+            try:
+                return select.select(fds,[],[],nextsleep)[0]
+            except InterruptedError:
+                # Ignore EINTR
+                return []
+        else:
+            return select.select(fds,[],[],0)[0]
+
+
+class ServerCommunicator():
+    def __init__(self, connection, recv):
+        self.connection = connection
+        self.recv = recv
 
     def runCommand(self, command):
-        """
-        Run a cooker command on the server
-        """
-        self.command_channel.send(self.cooker.command.runCommand(command))
+        self.connection.send(command)
+        if not self.recv.poll(30):
+            raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server")
+        return self.recv.get()
 
-    def stop(self):
-        self.quitin.send("quit")
-        self.quitin.close()
-
-class BitBakeProcessServerConnection(BitBakeBaseServerConnection):
-    def __init__(self, serverImpl, ui_channel, event_queue):
-        self.procserver = serverImpl
-        self.ui_channel = ui_channel
-        self.event_queue = event_queue
-        self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver)
-        self.events = self.event_queue
-        self.terminated = False
-
-    def sigterm_terminate(self):
-        bb.error("UI received SIGTERM")
-        self.terminate()
-
-    def terminate(self):
-        if self.terminated:
-            return
-        self.terminated = True
-        def flushevents():
-            while True:
-                try:
-                    event = self.event_queue.get(block=False)
-                except (Empty, IOError):
-                    break
-                if isinstance(event, logging.LogRecord):
-                    logger.handle(event)
-
-        self.procserver.stop()
-
-        while self.procserver.is_alive():
-            flushevents()
-            self.procserver.join(0.1)
-
-        self.ui_channel.close()
-        self.event_queue.close()
-        self.event_queue.setexit()
-        # XXX: Call explicity close in _writer to avoid
-        # fd leakage because isn't called on Queue.close()
-        self.event_queue._writer.close()
-
-# Wrap Queue to provide API which isn't server implementation specific
-class ProcessEventQueue(multiprocessing.queues.Queue):
-    def __init__(self, maxsize):
-        multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context())
-        self.exit = False
-        bb.utils.set_process_name("ProcessEQueue")
-
-    def setexit(self):
-        self.exit = True
-
-    def waitEvent(self, timeout):
-        if self.exit:
-            return self.getEvent()
-        try:
-            if not self.server.is_alive():
-                return self.getEvent()
-            return self.get(True, timeout)
-        except Empty:
-            return None
-
-    def getEvent(self):
-        try:
-            if not self.server.is_alive():
-                self.setexit()
-            return self.get(False)
-        except Empty:
-            if self.exit:
-                sys.exit(1)
-            return None
-
-class BitBakeServer(BitBakeBaseServer):
-    def initServer(self, single_use=True):
-        # establish communication channels.  We use bidirectional pipes for
-        # ui <--> server command/response pairs
-        # and a queue for server -> ui event notifications
-        #
-        self.ui_channel, self.server_channel = Pipe()
-        self.event_queue = ProcessEventQueue(0)
-        self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None)
-        self.event_queue.server = self.serverImpl
-
-    def detach(self):
-        self.serverImpl.start()
-        return
-
-    def establishConnection(self, featureset):
-
-        self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue)
-
-        _, error = self.connection.connection.runCommand(["setFeatures", featureset])
+    def updateFeatureSet(self, featureset):
+        _, error = self.runCommand(["setFeatures", featureset])
         if error:
             logger.error("Unable to set the cooker to the correct featureset: %s" % error)
             raise BaseException(error)
-        signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate())
-        return self.connection
+
+    def getEventHandle(self):
+        handle, error = self.runCommand(["getUIHandlerNum"])
+        if error:
+            logger.error("Unable to get UI Handler Number: %s" % error)
+            raise BaseException(error)
+
+        return handle
+
+    def terminateServer(self):
+        self.connection.send(['terminateServer'])
+        return
+
+class BitBakeProcessServerConnection(object):
+    def __init__(self, ui_channel, recv, eq, sock):
+        self.connection = ServerCommunicator(ui_channel, recv)
+        self.events = eq
+        # Save sock so it doesn't get gc'd for the life of our connection
+        self.socket_connection = sock
+
+    def terminate(self):
+        self.socket_connection.close()
+        self.connection.connection.close()
+        self.connection.recv.close()
+        return
+
+class BitBakeServer(object):
+    start_log_format = '--- Starting bitbake server pid %s at %s ---'
+    start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
+
+    def __init__(self, lock, sockname, configuration, featureset):
+
+        self.configuration = configuration
+        self.featureset = featureset
+        self.sockname = sockname
+        self.bitbake_lock = lock
+        self.readypipe, self.readypipein = os.pipe()
+
+        # Create server control socket
+        if os.path.exists(sockname):
+            os.unlink(sockname)
+
+        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        # AF_UNIX has path length issues so chdir here to workaround
+        cwd = os.getcwd()
+        logfile = os.path.join(cwd, "bitbake-cookerdaemon.log")
+
+        try:
+            os.chdir(os.path.dirname(sockname))
+            self.sock.bind(os.path.basename(sockname))
+        finally:
+            os.chdir(cwd)
+        self.sock.listen(1)
+
+        os.set_inheritable(self.sock.fileno(), True)
+        startdatetime = datetime.datetime.now()
+        bb.daemonize.createDaemon(self._startServer, logfile)
+        self.sock.close()
+        self.bitbake_lock.close()
+
+        ready = ConnectionReader(self.readypipe)
+        r = ready.poll(30)
+        if r:
+            r = ready.get()
+        if not r or r != "ready":
+            ready.close()
+            bb.error("Unable to start bitbake server")
+            if os.path.exists(logfile):
+                logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
+                started = False
+                lines = []
+                with open(logfile, "r") as f:
+                    for line in f:
+                        if started:
+                            lines.append(line)
+                        else:
+                            res = logstart_re.match(line.rstrip())
+                            if res:
+                                ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format)
+                                if ldatetime >= startdatetime:
+                                    started = True
+                                    lines.append(line)
+                if lines:
+                    if len(lines) > 10:
+                        bb.error("Last 10 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-10:])))
+                    else:
+                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
+            raise SystemExit(1)
+        ready.close()
+        os.close(self.readypipein)
+
+    def _startServer(self):
+        print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format)))
+        server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
+        self.configuration.setServerRegIdleCallback(server.register_idle_function)
+        writer = ConnectionWriter(self.readypipein)
+        try:
+            self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
+            writer.send("ready")
+        except:
+            writer.send("fail")
+            raise
+        finally:
+            os.close(self.readypipein)
+        server.cooker = self.cooker
+        server.server_timeout = self.configuration.server_timeout
+        server.xmlrpcinterface = self.configuration.xmlrpcinterface
+        print("Started bitbake server pid %d" % os.getpid())
+        server.start()
+
+def connectProcessServer(sockname, featureset):
+    # Connect to socket
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    # AF_UNIX has path length issues so chdir here to workaround
+    cwd = os.getcwd()
+
+    try:
+        os.chdir(os.path.dirname(sockname))
+        sock.connect(os.path.basename(sockname))
+    finally:
+        os.chdir(cwd)
+
+    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
+    eq = command_chan_recv = command_chan = None
+
+    try:
+
+        # Send an fd for the remote to write events to
+        readfd, writefd = os.pipe()
+        eq = BBUIEventQueue(readfd)
+        # Send an fd for the remote to recieve commands from
+        readfd1, writefd1 = os.pipe()
+        command_chan = ConnectionWriter(writefd1)
+        # Send an fd for the remote to write commands results to
+        readfd2, writefd2 = os.pipe()
+        command_chan_recv = ConnectionReader(readfd2)
+
+        sendfds(sock, [writefd, readfd1, writefd2])
+
+        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
+
+        # Close the ends of the pipes we won't use
+        for i in [writefd, readfd1, writefd2]:
+            os.close(i)
+
+        server_connection.connection.updateFeatureSet(featureset)
+
+    except (Exception, SystemExit) as e:
+        if command_chan_recv:
+            command_chan_recv.close()
+        if command_chan:
+            command_chan.close()
+        for i in [writefd, readfd1, writefd2]:
+            try:
+                os.close(i)
+            except OSError:
+                pass
+        sock.close()
+        raise
+
+    return server_connection
+
+def sendfds(sock, fds):
+        '''Send an array of fds over an AF_UNIX socket.'''
+        fds = array.array('i', fds)
+        msg = bytes([len(fds) % 256])
+        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+
+def recvfds(sock, size):
+        '''Receive an array of fds over an AF_UNIX socket.'''
+        a = array.array('i')
+        bytes_size = a.itemsize * size
+        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
+        if not msg and not ancdata:
+            raise EOFError
+        try:
+            if len(ancdata) != 1:
+                raise RuntimeError('received %d items of ancdata' %
+                                   len(ancdata))
+            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+            if (cmsg_level == socket.SOL_SOCKET and
+                cmsg_type == socket.SCM_RIGHTS):
+                if len(cmsg_data) % a.itemsize != 0:
+                    raise ValueError
+                a.frombytes(cmsg_data)
+                assert len(a) % 256 == msg[0]
+                return list(a)
+        except (ValueError, IndexError):
+            pass
+        raise RuntimeError('Invalid data received')
+
+class BBUIEventQueue:
+    def __init__(self, readfd):
+
+        self.eventQueue = []
+        self.eventQueueLock = threading.Lock()
+        self.eventQueueNotify = threading.Event()
+
+        self.reader = ConnectionReader(readfd)
+
+        self.t = threading.Thread()
+        self.t.setDaemon(True)
+        self.t.run = self.startCallbackHandler
+        self.t.start()
+
+    def getEvent(self):
+        self.eventQueueLock.acquire()
+
+        if len(self.eventQueue) == 0:
+            self.eventQueueLock.release()
+            return None
+
+        item = self.eventQueue.pop(0)
+
+        if len(self.eventQueue) == 0:
+            self.eventQueueNotify.clear()
+
+        self.eventQueueLock.release()
+        return item
+
+    def waitEvent(self, delay):
+        self.eventQueueNotify.wait(delay)
+        return self.getEvent()
+
+    def queue_event(self, event):
+        self.eventQueueLock.acquire()
+        self.eventQueue.append(event)
+        self.eventQueueNotify.set()
+        self.eventQueueLock.release()
+
+    def send_event(self, event):
+        self.queue_event(pickle.loads(event))
+
+    def startCallbackHandler(self):
+        bb.utils.set_process_name("UIEventQueue")
+        while True:
+            try:
+                self.reader.wait()
+                event = self.reader.get()
+                self.queue_event(event)
+            except EOFError:
+                # Easiest way to exit is to close the file descriptor to cause an exit
+                break
+        self.reader.close()
+
+class ConnectionReader(object):
+
+    def __init__(self, fd):
+        self.reader = multiprocessing.connection.Connection(fd, writable=False)
+        self.rlock = multiprocessing.Lock()
+
+    def wait(self, timeout=None):
+        return multiprocessing.connection.wait([self.reader], timeout)
+
+    def poll(self, timeout=None):
+        return self.reader.poll(timeout)
+
+    def get(self):
+        with self.rlock:
+            res = self.reader.recv_bytes()
+        return multiprocessing.reduction.ForkingPickler.loads(res)
+
+    def fileno(self):
+        return self.reader.fileno()
+
+    def close(self):
+        return self.reader.close()
+
+
+class ConnectionWriter(object):
+
+    def __init__(self, fd):
+        self.writer = multiprocessing.connection.Connection(fd, readable=False)
+        self.wlock = multiprocessing.Lock()
+        # Why bb.event needs this I have no idea
+        self.event = self
+
+    def send(self, obj):
+        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
+        with self.wlock:
+            self.writer.send_bytes(obj)
+
+    def fileno(self):
+        return self.writer.fileno()
+
+    def close(self):
+        return self.writer.close()