2012-09-13 18 views
12

Quello che vorrei fare è creare una sorta di "pipe" (come una pipe tra i processi), ma tra iostream C++ all'interno dello stesso programma. Ho una funzione che richiede un flusso di input come argomento, ma i miei dati provengono da un flusso di output. Quindi c'è un modo standard per convogliare l'output di un std::ostream nell'ingresso di un std::istream?C++ collega il flusso di output al flusso di input

+2

Lo std :: stringstream soddisfa le tue esigenze? Se no, spiega perché. – AProgrammer

+1

C'è un iostream (si noti che ha un 'i' e un' o' all'inizio). Pompi i dati in un'estremità e ne esce fuori. E 'questo quello che vuoi. –

+0

-1 sotto specifica domanda. –

risposta

13

È possibile creare un std::streambuf in cui l'uscita passa a un buffer e ai blocchi std::overflow() quando il buffer si riempie. D'altra parte avresti un buffer di input che blocca su underflow() quando il buffer si svuota. Ovviamente, la lettura e la scrittura sarebbero in due diversi thread.

Il difficile compito è come sincronizzare i due buffer: gli stream non utilizzano alcuna operazione di sincronizzazione durante l'accesso ai buffer. Solo quando viene chiamata una qualsiasi delle funzioni virtuali, è possibile intercettare l'operazione e gestire la sincronizzazione. D'altra parte, non usare un buffer è abbastanza inefficiente. Il modo in cui affronterò questo problema è utilizzando un buffer di uscita relativamente piccolo (ad esempio 256 char s) e anche l'override di sync() per utilizzare questa funzione per il trasferimento di caratteri nel buffer di input. Lo streambuf utilizza un mutex per la sincronizzazione e una variabile di condizione da bloccare su un buffer di input completo sull'output e un buffer di input vuoto sull'input. Per supportare lo shutdown pulito, dovrebbe esserci anche una funzione che imposta un flag in modo tale che non vi sia più input in arrivo e tutte le ulteriori operazioni di output dovrebbero fallire.

La creazione dell'implementazione effettiva rivela che due buffer non sono sufficienti: i thread che accedono all'entrata e al buffer di output possono essere attivi quando il rispettivo altro buffer blocca. Pertanto, è necessario un terzo buffer intermedio. Con questa piccola modifica al piano di cui sopra, di seguito è riportato un codice (utilizza piccoli buffer per assicurarsi che ci siano overflow e underflow effettivi, per un uso reale almeno il buffer di input dovrebbe probabilmente essere più grande).

// threadbuf.cpp              -*-C++-*- 
// ---------------------------------------------------------------------------- 
// Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de   
//                  
// Permission is hereby granted, free of charge, to any person   
// obtaining a copy of this software and associated documentation  
// files (the "Software"), to deal in the Software without restriction, 
// including without limitation the rights to use, copy, modify,   
// merge, publish, distribute, sublicense, and/or sell copies of   
// the Software, and to permit persons to whom the Software is   
// furnished to do so, subject to the following conditions:    
//                  
// The above copyright notice and this permission notice shall be  
// included in all copies or substantial portions of the Software.  
//                  
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,  
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES  
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND    
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT   
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,   
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR   
// OTHER DEALINGS IN THE SOFTWARE. 
// ---------------------------------------------------------------------------- 


#include <algorithm> 
#include <condition_variable> 
#include <iostream> 
#include <mutex> 
#include <stdexcept> 
#include <streambuf> 
#include <string> 
#include <thread> 

// ---------------------------------------------------------------------------- 

class threadbuf 
    : public std::streambuf 
{ 
private: 
    typedef std::streambuf::traits_type traits_type; 
    typedef std::string::size_type  string_size_t; 

    std::mutex    d_mutex; 
    std::condition_variable d_condition; 
    std::string    d_out; 
    std::string    d_in; 
    std::string    d_tmp; 
    char*     d_current; 
    bool     d_closed; 

public: 
    threadbuf(string_size_t out_size = 16, string_size_t in_size = 64) 
     : d_out(std::max(string_size_t(1), out_size), ' ') 
     , d_in(std::max(string_size_t(1), in_size), ' ') 
     , d_tmp(std::max(string_size_t(1), in_size), ' ') 
     , d_current(&this->d_tmp[0]) 
     , d_closed(false) 
    { 
     this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1); 
     this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]); 
    } 
    void close() 
    { 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      this->d_closed = true; 
      while (this->pbase() != this->pptr()) { 
       this->internal_sync(lock); 
      } 
     } 
     this->d_condition.notify_all(); 
    } 

private: 
    int_type underflow() 
    { 
     if (this->gptr() == this->egptr()) 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      while (&this->d_tmp[0] == this->d_current && !this->d_closed) { 
       this->d_condition.wait(lock); 
      } 
      if (&this->d_tmp[0] != this->d_current) { 
       std::streamsize size(this->d_current - &this->d_tmp[0]); 
       traits_type::copy(this->eback(), &this->d_tmp[0], 
            this->d_current - &this->d_tmp[0]); 
       this->setg(this->eback(), this->eback(), this->eback() + size); 
       this->d_current = &this->d_tmp[0]; 
       this->d_condition.notify_one(); 
      } 
     } 
     return this->gptr() == this->egptr() 
      ? traits_type::eof() 
      : traits_type::to_int_type(*this->gptr()); 
    } 
    int_type overflow(int_type c) 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     if (!traits_type::eq_int_type(c, traits_type::eof())) { 
      *this->pptr() = traits_type::to_char_type(c); 
      this->pbump(1); 
     } 
     return this->internal_sync(lock) 
      ? traits_type::eof() 
      : traits_type::not_eof(c); 
    } 
    int sync() 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     return this->internal_sync(lock); 
    } 
    int internal_sync(std::unique_lock<std::mutex>& lock) 
    { 
     char* end(&this->d_tmp[0] + this->d_tmp.size()); 
     while (this->d_current == end && !this->d_closed) { 
      this->d_condition.wait(lock); 
     } 
     if (this->d_current != end) 
     { 
      std::streamsize size(std::min(end - d_current, 
              this->pptr() - this->pbase())); 
      traits_type::copy(d_current, this->pbase(), size); 
      this->d_current += size; 
      std::streamsize remain((this->pptr() - this->pbase()) - size); 
      traits_type::move(this->pbase(), this->pptr(), remain); 
      this->setp(this->pbase(), this->epptr()); 
      this->pbump(remain); 
      this->d_condition.notify_one(); 
      return 0; 
     } 
     return traits_type::eof(); 
    } 
}; 

// ---------------------------------------------------------------------------- 

static void writer(std::ostream& out) 
{ 
    for (std::string line; std::getline(std::cin, line);) 
    { 
     out << "writer: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

static void reader(std::istream& in) 
{ 
    for (std::string line; std::getline(in, line);) 
    { 
     std::cout << "reader: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

int main() 
{ 
    try 
    { 
     threadbuf sbuf; 
     std::ostream out(&sbuf); 
     std::istream in(&sbuf); 

     std::thread write(&::writer, std::ref(out)); 
     std::thread read(&::reader, std::ref(in)); 

     write.join(); 
     sbuf.close(); 
     read.join(); 
    } 
    catch (std::exception const& ex) 
    { 
     std::cerr << "ERROR: " << ex.what() << "\n"; 
    } 
} 
+1

+1 per lo sforzo; l'OP stava certamente cercando una soluzione molto più rapida e semplice. – Walter

+0

Bene, usare il codice sopra ** è ** veloce per me ma io :) Ho visto una richiesta simile prima, cioè potrebbe essere utile anche per gli altri. ... ed è stato un esercizio interessante implementare effettivamente quello che ho appena abbozzato prima. Infine: non sono a conoscenza di una soluzione più semplice! –

+0

Sei una leggenda rivoltante Dietmar. Sto usando questo in un test unitario e funziona a meraviglia. Grazie. – MattSmith

Problemi correlati