2011-11-17 11 views
5

Sto provando a scrivere un semplice script Python per interfacciarlo con un server di chat. Effettua il polling del server per gli aggiornamenti e consente all'utente di inserire il testo da inviare al server come chat. Posso ottenere qualcosa che viene hackerato insieme al multithreading, ma sembra terribile. C'è un modo semplice e semplice per visualizzare le informazioni di aggiornamento sullo schermo accettando anche l'input dell'utente? Preferirei farlo senza maledire.Script Python interattivo asincrono

+1

Per eseguire due attività contemporaneamente, sono necessari i thread. È necessario disporre di un thread per gestire l'input dell'utente, probabilmente il thread principale. Quindi crea un altro thread per gestire le risposte provenienti dal server di chat. –

+0

Ho ottenuto quella parte di lavoro, ma non viene visualizzata correttamente. Sto usando raw_input per ottenere input, e viene incasinato quando si stampa il testo dopo che è stato chiamato. –

+2

@HunterMcMillen - non del tutto vero - alcuni toolkit UI (G) e framework di comunicazione utilizzano un ciclo non thread (spesso basato su 'select'), ad es. PyGTK e Twisted. – detly

risposta

0

Lascia che la finestra di visualizzazione/schermo sia alimentata da un db locale temporaneo.

Codice 1: Aggiorna lo schermo da db. È possibile utilizzare loop infinito, con o senza una pausa (frequenza di aggiornamento)

Codice 2: Aggiornamenti db non appena l'utente inserisce qualcosa

Codice 3: continuamente controllare gli aggiornamenti dal server di chat e aggiorna la db non appena vengono ricevuti nuovi obiettivi.

0

Non so come codificare con ncurses, ma ecco una soluzione con wxWidget. Dovrebbe essere approssimativamente simile per quanto riguarda il design.

"""Asynchronous interactive Python script""" 
import random 
import threading 
import wx 
import wx.lib.mixins.listctrl as listmix 

LOCK = threading.Lock() 

def threadsafe(function): 
    """A decorator that makes a function safe against concurrent accesses.""" 
    def _decorated_function(*args, **kwargs): 
     """Replacement function.""" 
     with LOCK: 
      function(*args, **kwargs) 
    return _decorated_function 


class SharedList(wx.ListCtrl, listmix.ListCtrlAutoWidthMixin): 
    """An output list that can print information from both the user and the server. 

    N.B.: The _print function that actually updates the list content uses the threadsafe decorator. 
    """ 

    def __init__(self, parent, pos=wx.DefaultPosition, size=(-1, -1), style=wx.LC_REPORT): 
     wx.ListCtrl.__init__(self, parent, wx.ID_ANY, pos, size, style) 
     self.InsertColumn(0, 'Origin', width=75) 
     self.InsertColumn(1, 'Output') 
     listmix.ListCtrlAutoWidthMixin.__init__(self) 
     self.resizeLastColumn(1000) 
     self._list_index = 0 

    def user_print(self, text): 
     """Print a line as the user""" 
     self._print("user", text) 

    def chat_print(self, text): 
     """Print a line as the chat server""" 
     self._print("chat", text) 

    @threadsafe 
    def _print(self, origin, text): 
     """Generic print function.""" 
     self.InsertStringItem(self._list_index, str(origin)) 
     self.SetStringItem(self._list_index, 1, str(text)) 
     self.EnsureVisible(self.GetItemCount() - 1) 
     self._list_index = self._list_index + 1 


class ServerChecker(threading.Thread): 
    """A separate thread that would connect to the IRC chat.""" 

    def __init__(self, shared_list): 
     threading.Thread.__init__(self) 
     self._stop_event = threading.Event() 
     self._shared_list = shared_list 

    def run(self): 
     """Connection to the server, socket, listen, bla bla bla.""" 
     while not self._stop_event.is_set(): 
      self._shared_list.chat_print("bla bla bla") 
      self._stop_event.wait(random.randint(1, 3)) 

    def stop(self): 
     """Stop the thread.""" 
     self._stop_event.set() 


class SampleFrame(wx.Frame): 
    """The main GUI element.""" 

    def __init__(self): 
     super(SampleFrame, self).__init__(parent=None, title='DAS Board', size=(600, 400)) 
     self._shared_list = None 
     self._user_text = None 
     self._init_ui() 
     self._thread = ServerChecker(self._shared_list) 
     self._thread.start() 
     self.Bind(wx.EVT_CLOSE, self._on_close) 
     self.Center() 
     self.Show() 

    def _init_ui(self): 
     """Building and assembling the graphical elements. 
     Don't pay too much attention, especially if you want to use ncurses instead. 
     """ 
     panel = wx.Panel(self) 
     self._shared_list = SharedList(panel) 
     main_box_v = wx.BoxSizer(wx.VERTICAL) 
     main_box_v.Add(self._shared_list, proportion=1, flag=wx.EXPAND|wx.ALL, border=10) 
     self._user_text = wx.TextCtrl(panel, -1, value="User text to send...", 
             style=wx.TE_CENTRE) 
     main_box_v.Add(self._user_text, proportion=0, flag=wx.EXPAND|wx.ALL, border=10) 
     button = wx.Button(panel, label="Send user text.") 
     button.Bind(wx.EVT_BUTTON, self._user_send_text) 
     main_box_v.Add(button, flag=wx.EXPAND|wx.ALL, border=10) 
     panel.SetSizer(main_box_v) 

    def _user_send_text(self, event): 
     """Button callback""" 
     self._shared_list.user_print(self._user_text.GetValue()) 
     event.Skip() 

    def _on_close(self, event): 
     """Stop the separate thread, then destroy the GUI.""" 
     event.Skip() 
     self._thread.stop() 
     self.Destroy() 


APP = wx.App(0) 
SampleFrame() 
APP.MainLoop() 
Problemi correlati