Simple multithreaded chat server in Rust











up vote
2
down vote

favorite












I'm trying to develop a simple Rust chat server. I'm not a Rust expert and come from Java and Kotlin.



This same server in Kotlin is:






import java.io.*
import java.net.*
import kotlin.concurrent.thread
import java.util.concurrent.ConcurrentHashMap

fun main(args: Array<String>) {
val serv = ServerSocket(Integer.parseInt(args[0]))
//val users = ConcurrentHashMap<String, PrintWriter>()
val users = Collections.synchronizedMap(HashMap<String, PrintWriter>())

while (true) {
val s = serv.accept()
thread {
var sin = s.getInputStream().bufferedReader()
val sout = PrintWriter(s.getOutputStream(), true)

// read nick
val nick = sin.readLine()
users.put(nick, sout)

sin.forEachLine {
for (peer in users.values) {
if (peer == sout) continue
peer.println(it)
}
}
users.remove(nick)
}
}
}



After many attempts I've come up with a working Rust implementation :



use std::env;
use std::io;
use std::io::Write;
use std::io::{LineWriter, BufReader, BufRead};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::thread;
use std::ops::DerefMut;

fn main() -> io::Result<()> {
let args: Vec<String> = env::args().collect();
let server_socket = TcpListener::bind(&*format!("localhost:{}", args[1]))?;
let users: Arc<Mutex<HashMap<String, LineWriter<TcpStream>>>> = Arc::new(Mutex::new(HashMap::new()));

for socket in server_socket.incoming() {
let users = users.clone();
thread::spawn(move || -> io::Result<()> {
let socket = socket?;
let socket_copy = socket.try_clone()?;
let mut inp = BufReader::new(socket);
let out = LineWriter::new(socket_copy);

// read nick
let mut nick = String::new();
inp.read_line(&mut nick)?;
nick.pop(); // discard 'n'
let nick_copy = nick.clone();
{
let mut users = users.lock().unwrap();
users.insert(nick, out);
}

for line in inp.lines() {
{
let line = line?;
let mut users = users.lock().unwrap();
for (nick, peer) in users.deref_mut() {
if *nick == nick_copy { continue; }
writeln!(peer, "{}", line)?;
}
}
}

// remove nick
{
let mut users = users.lock().unwrap();
users.remove(&nick_copy);
}
Ok(())
});
}
Ok(())
}


Too verbose for my taste.



I'm particularly dissatisfied with those clone ops. When to move? When to borrow?



I'd prefer to use for peer in users.values_mut() instead of for (nick, peer) in users.deref_mut() but then I've problems when comparing LineWriter references to discard sender.



I'd also prefer to use RwLock instead of Mutex.



Any pointers to simplify/enhance code will be greatly appreciated.










share|improve this question




























    up vote
    2
    down vote

    favorite












    I'm trying to develop a simple Rust chat server. I'm not a Rust expert and come from Java and Kotlin.



    This same server in Kotlin is:






    import java.io.*
    import java.net.*
    import kotlin.concurrent.thread
    import java.util.concurrent.ConcurrentHashMap

    fun main(args: Array<String>) {
    val serv = ServerSocket(Integer.parseInt(args[0]))
    //val users = ConcurrentHashMap<String, PrintWriter>()
    val users = Collections.synchronizedMap(HashMap<String, PrintWriter>())

    while (true) {
    val s = serv.accept()
    thread {
    var sin = s.getInputStream().bufferedReader()
    val sout = PrintWriter(s.getOutputStream(), true)

    // read nick
    val nick = sin.readLine()
    users.put(nick, sout)

    sin.forEachLine {
    for (peer in users.values) {
    if (peer == sout) continue
    peer.println(it)
    }
    }
    users.remove(nick)
    }
    }
    }



    After many attempts I've come up with a working Rust implementation :



    use std::env;
    use std::io;
    use std::io::Write;
    use std::io::{LineWriter, BufReader, BufRead};
    use std::net::{TcpListener, TcpStream};
    use std::sync::{Arc, Mutex};
    use std::collections::HashMap;
    use std::thread;
    use std::ops::DerefMut;

    fn main() -> io::Result<()> {
    let args: Vec<String> = env::args().collect();
    let server_socket = TcpListener::bind(&*format!("localhost:{}", args[1]))?;
    let users: Arc<Mutex<HashMap<String, LineWriter<TcpStream>>>> = Arc::new(Mutex::new(HashMap::new()));

    for socket in server_socket.incoming() {
    let users = users.clone();
    thread::spawn(move || -> io::Result<()> {
    let socket = socket?;
    let socket_copy = socket.try_clone()?;
    let mut inp = BufReader::new(socket);
    let out = LineWriter::new(socket_copy);

    // read nick
    let mut nick = String::new();
    inp.read_line(&mut nick)?;
    nick.pop(); // discard 'n'
    let nick_copy = nick.clone();
    {
    let mut users = users.lock().unwrap();
    users.insert(nick, out);
    }

    for line in inp.lines() {
    {
    let line = line?;
    let mut users = users.lock().unwrap();
    for (nick, peer) in users.deref_mut() {
    if *nick == nick_copy { continue; }
    writeln!(peer, "{}", line)?;
    }
    }
    }

    // remove nick
    {
    let mut users = users.lock().unwrap();
    users.remove(&nick_copy);
    }
    Ok(())
    });
    }
    Ok(())
    }


    Too verbose for my taste.



    I'm particularly dissatisfied with those clone ops. When to move? When to borrow?



    I'd prefer to use for peer in users.values_mut() instead of for (nick, peer) in users.deref_mut() but then I've problems when comparing LineWriter references to discard sender.



    I'd also prefer to use RwLock instead of Mutex.



    Any pointers to simplify/enhance code will be greatly appreciated.










    share|improve this question


























      up vote
      2
      down vote

      favorite









      up vote
      2
      down vote

      favorite











      I'm trying to develop a simple Rust chat server. I'm not a Rust expert and come from Java and Kotlin.



      This same server in Kotlin is:






      import java.io.*
      import java.net.*
      import kotlin.concurrent.thread
      import java.util.concurrent.ConcurrentHashMap

      fun main(args: Array<String>) {
      val serv = ServerSocket(Integer.parseInt(args[0]))
      //val users = ConcurrentHashMap<String, PrintWriter>()
      val users = Collections.synchronizedMap(HashMap<String, PrintWriter>())

      while (true) {
      val s = serv.accept()
      thread {
      var sin = s.getInputStream().bufferedReader()
      val sout = PrintWriter(s.getOutputStream(), true)

      // read nick
      val nick = sin.readLine()
      users.put(nick, sout)

      sin.forEachLine {
      for (peer in users.values) {
      if (peer == sout) continue
      peer.println(it)
      }
      }
      users.remove(nick)
      }
      }
      }



      After many attempts I've come up with a working Rust implementation :



      use std::env;
      use std::io;
      use std::io::Write;
      use std::io::{LineWriter, BufReader, BufRead};
      use std::net::{TcpListener, TcpStream};
      use std::sync::{Arc, Mutex};
      use std::collections::HashMap;
      use std::thread;
      use std::ops::DerefMut;

      fn main() -> io::Result<()> {
      let args: Vec<String> = env::args().collect();
      let server_socket = TcpListener::bind(&*format!("localhost:{}", args[1]))?;
      let users: Arc<Mutex<HashMap<String, LineWriter<TcpStream>>>> = Arc::new(Mutex::new(HashMap::new()));

      for socket in server_socket.incoming() {
      let users = users.clone();
      thread::spawn(move || -> io::Result<()> {
      let socket = socket?;
      let socket_copy = socket.try_clone()?;
      let mut inp = BufReader::new(socket);
      let out = LineWriter::new(socket_copy);

      // read nick
      let mut nick = String::new();
      inp.read_line(&mut nick)?;
      nick.pop(); // discard 'n'
      let nick_copy = nick.clone();
      {
      let mut users = users.lock().unwrap();
      users.insert(nick, out);
      }

      for line in inp.lines() {
      {
      let line = line?;
      let mut users = users.lock().unwrap();
      for (nick, peer) in users.deref_mut() {
      if *nick == nick_copy { continue; }
      writeln!(peer, "{}", line)?;
      }
      }
      }

      // remove nick
      {
      let mut users = users.lock().unwrap();
      users.remove(&nick_copy);
      }
      Ok(())
      });
      }
      Ok(())
      }


      Too verbose for my taste.



      I'm particularly dissatisfied with those clone ops. When to move? When to borrow?



      I'd prefer to use for peer in users.values_mut() instead of for (nick, peer) in users.deref_mut() but then I've problems when comparing LineWriter references to discard sender.



      I'd also prefer to use RwLock instead of Mutex.



      Any pointers to simplify/enhance code will be greatly appreciated.










      share|improve this question















      I'm trying to develop a simple Rust chat server. I'm not a Rust expert and come from Java and Kotlin.



      This same server in Kotlin is:






      import java.io.*
      import java.net.*
      import kotlin.concurrent.thread
      import java.util.concurrent.ConcurrentHashMap

      fun main(args: Array<String>) {
      val serv = ServerSocket(Integer.parseInt(args[0]))
      //val users = ConcurrentHashMap<String, PrintWriter>()
      val users = Collections.synchronizedMap(HashMap<String, PrintWriter>())

      while (true) {
      val s = serv.accept()
      thread {
      var sin = s.getInputStream().bufferedReader()
      val sout = PrintWriter(s.getOutputStream(), true)

      // read nick
      val nick = sin.readLine()
      users.put(nick, sout)

      sin.forEachLine {
      for (peer in users.values) {
      if (peer == sout) continue
      peer.println(it)
      }
      }
      users.remove(nick)
      }
      }
      }



      After many attempts I've come up with a working Rust implementation :



      use std::env;
      use std::io;
      use std::io::Write;
      use std::io::{LineWriter, BufReader, BufRead};
      use std::net::{TcpListener, TcpStream};
      use std::sync::{Arc, Mutex};
      use std::collections::HashMap;
      use std::thread;
      use std::ops::DerefMut;

      fn main() -> io::Result<()> {
      let args: Vec<String> = env::args().collect();
      let server_socket = TcpListener::bind(&*format!("localhost:{}", args[1]))?;
      let users: Arc<Mutex<HashMap<String, LineWriter<TcpStream>>>> = Arc::new(Mutex::new(HashMap::new()));

      for socket in server_socket.incoming() {
      let users = users.clone();
      thread::spawn(move || -> io::Result<()> {
      let socket = socket?;
      let socket_copy = socket.try_clone()?;
      let mut inp = BufReader::new(socket);
      let out = LineWriter::new(socket_copy);

      // read nick
      let mut nick = String::new();
      inp.read_line(&mut nick)?;
      nick.pop(); // discard 'n'
      let nick_copy = nick.clone();
      {
      let mut users = users.lock().unwrap();
      users.insert(nick, out);
      }

      for line in inp.lines() {
      {
      let line = line?;
      let mut users = users.lock().unwrap();
      for (nick, peer) in users.deref_mut() {
      if *nick == nick_copy { continue; }
      writeln!(peer, "{}", line)?;
      }
      }
      }

      // remove nick
      {
      let mut users = users.lock().unwrap();
      users.remove(&nick_copy);
      }
      Ok(())
      });
      }
      Ok(())
      }


      Too verbose for my taste.



      I'm particularly dissatisfied with those clone ops. When to move? When to borrow?



      I'd prefer to use for peer in users.values_mut() instead of for (nick, peer) in users.deref_mut() but then I've problems when comparing LineWriter references to discard sender.



      I'd also prefer to use RwLock instead of Mutex.



      Any pointers to simplify/enhance code will be greatly appreciated.







      rust socket server tcp chat






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited yesterday









      200_success

      127k15149412




      127k15149412










      asked yesterday









      francesc

      262




      262






















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote













          I made some changes to your code (see below). You can check the code for what I've done so far: I added some comments on it so you can compare it with your version. I'll note some important points in here. Please ask if you have any questions; I will try to explain or we can improve the code together.




          • Propagating error to the main function from a spawned thread is not possible. It's only possible after joined to the main thread (you can use join handler then propagate your anonymous function's return value to the main).

          • Created ConnectionsSync struct as helper to get rid of Arc<RwLock<HashMap< initialization: it was looking too messy; also with this way you can add behaviors like (add_user, broadcast_message, remove_user, ... )


          • Incoming has Iterator implementation so I can use it as iterator and I used try_for_each to tell that I am going to apply fallible function, then it will be propagated to main.


          Code



          use std::collections::HashMap;
          use std::env;
          use std::io;
          use std::io::{BufRead, BufReader, LineWriter, Write};
          use std::net::{TcpListener, TcpStream};
          use std::sync::{Arc, RwLock};
          use std::thread;

          struct ConnectionsSync(Arc<RwLock<HashMap<String, LineWriter<TcpStream>>>>);
          impl ConnectionsSync {
          fn new() -> Self {
          ConnectionsSync(Arc::new(RwLock::new(HashMap::new())))
          }
          }

          fn main() -> io::Result<()> {
          let port = env::args().nth(1).unwrap_or("8080".to_string()); //default port 8080
          let connections = ConnectionsSync::new();

          TcpListener::bind(&format!("localhost:{}", port))?
          .incoming()
          .try_for_each(|socket| -> io::Result<()> {
          let socket = socket?;
          let connections = connections.0.clone(); //increases ref count

          thread::spawn(move || {
          let (mut reader, writer) = socket
          .try_clone()
          .map(|socket1| (BufReader::new(socket1), LineWriter::new(socket)))
          .unwrap();

          // read nick
          let mut nick = String::new();
          reader
          .read_line(&mut nick)
          .and_then(|_| Ok(nick.pop()))
          .expect("Not able to read user nick");
          let current_user = nick.clone();

          //add user
          connections
          .write()
          .unwrap()
          .insert(current_user.clone(), writer);

          //read user input
          reader.lines().into_iter().for_each(|line| {
          let line = line.expect("Not able to read user input");
          connections
          .write()
          .unwrap()
          .iter_mut()
          .filter(|(other_user, _)| **other_user != current_user)
          .for_each(|(_, peer)| {
          writeln!(peer, "{} t-> {}", current_user, line);
          })
          });

          //discard user
          connections.write().unwrap().remove(&mut nick);
          });

          Ok(())
          })
          }





          share|improve this answer










          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.


















          • I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
            – francesc
            17 hours ago













          Your Answer





          StackExchange.ifUsing("editor", function () {
          return StackExchange.using("mathjaxEditing", function () {
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          });
          });
          }, "mathjax-editing");

          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "196"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f209016%2fsimple-multithreaded-chat-server-in-rust%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          0
          down vote













          I made some changes to your code (see below). You can check the code for what I've done so far: I added some comments on it so you can compare it with your version. I'll note some important points in here. Please ask if you have any questions; I will try to explain or we can improve the code together.




          • Propagating error to the main function from a spawned thread is not possible. It's only possible after joined to the main thread (you can use join handler then propagate your anonymous function's return value to the main).

          • Created ConnectionsSync struct as helper to get rid of Arc<RwLock<HashMap< initialization: it was looking too messy; also with this way you can add behaviors like (add_user, broadcast_message, remove_user, ... )


          • Incoming has Iterator implementation so I can use it as iterator and I used try_for_each to tell that I am going to apply fallible function, then it will be propagated to main.


          Code



          use std::collections::HashMap;
          use std::env;
          use std::io;
          use std::io::{BufRead, BufReader, LineWriter, Write};
          use std::net::{TcpListener, TcpStream};
          use std::sync::{Arc, RwLock};
          use std::thread;

          struct ConnectionsSync(Arc<RwLock<HashMap<String, LineWriter<TcpStream>>>>);
          impl ConnectionsSync {
          fn new() -> Self {
          ConnectionsSync(Arc::new(RwLock::new(HashMap::new())))
          }
          }

          fn main() -> io::Result<()> {
          let port = env::args().nth(1).unwrap_or("8080".to_string()); //default port 8080
          let connections = ConnectionsSync::new();

          TcpListener::bind(&format!("localhost:{}", port))?
          .incoming()
          .try_for_each(|socket| -> io::Result<()> {
          let socket = socket?;
          let connections = connections.0.clone(); //increases ref count

          thread::spawn(move || {
          let (mut reader, writer) = socket
          .try_clone()
          .map(|socket1| (BufReader::new(socket1), LineWriter::new(socket)))
          .unwrap();

          // read nick
          let mut nick = String::new();
          reader
          .read_line(&mut nick)
          .and_then(|_| Ok(nick.pop()))
          .expect("Not able to read user nick");
          let current_user = nick.clone();

          //add user
          connections
          .write()
          .unwrap()
          .insert(current_user.clone(), writer);

          //read user input
          reader.lines().into_iter().for_each(|line| {
          let line = line.expect("Not able to read user input");
          connections
          .write()
          .unwrap()
          .iter_mut()
          .filter(|(other_user, _)| **other_user != current_user)
          .for_each(|(_, peer)| {
          writeln!(peer, "{} t-> {}", current_user, line);
          })
          });

          //discard user
          connections.write().unwrap().remove(&mut nick);
          });

          Ok(())
          })
          }





          share|improve this answer










          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.


















          • I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
            – francesc
            17 hours ago

















          up vote
          0
          down vote













          I made some changes to your code (see below). You can check the code for what I've done so far: I added some comments on it so you can compare it with your version. I'll note some important points in here. Please ask if you have any questions; I will try to explain or we can improve the code together.




          • Propagating error to the main function from a spawned thread is not possible. It's only possible after joined to the main thread (you can use join handler then propagate your anonymous function's return value to the main).

          • Created ConnectionsSync struct as helper to get rid of Arc<RwLock<HashMap< initialization: it was looking too messy; also with this way you can add behaviors like (add_user, broadcast_message, remove_user, ... )


          • Incoming has Iterator implementation so I can use it as iterator and I used try_for_each to tell that I am going to apply fallible function, then it will be propagated to main.


          Code



          use std::collections::HashMap;
          use std::env;
          use std::io;
          use std::io::{BufRead, BufReader, LineWriter, Write};
          use std::net::{TcpListener, TcpStream};
          use std::sync::{Arc, RwLock};
          use std::thread;

          struct ConnectionsSync(Arc<RwLock<HashMap<String, LineWriter<TcpStream>>>>);
          impl ConnectionsSync {
          fn new() -> Self {
          ConnectionsSync(Arc::new(RwLock::new(HashMap::new())))
          }
          }

          fn main() -> io::Result<()> {
          let port = env::args().nth(1).unwrap_or("8080".to_string()); //default port 8080
          let connections = ConnectionsSync::new();

          TcpListener::bind(&format!("localhost:{}", port))?
          .incoming()
          .try_for_each(|socket| -> io::Result<()> {
          let socket = socket?;
          let connections = connections.0.clone(); //increases ref count

          thread::spawn(move || {
          let (mut reader, writer) = socket
          .try_clone()
          .map(|socket1| (BufReader::new(socket1), LineWriter::new(socket)))
          .unwrap();

          // read nick
          let mut nick = String::new();
          reader
          .read_line(&mut nick)
          .and_then(|_| Ok(nick.pop()))
          .expect("Not able to read user nick");
          let current_user = nick.clone();

          //add user
          connections
          .write()
          .unwrap()
          .insert(current_user.clone(), writer);

          //read user input
          reader.lines().into_iter().for_each(|line| {
          let line = line.expect("Not able to read user input");
          connections
          .write()
          .unwrap()
          .iter_mut()
          .filter(|(other_user, _)| **other_user != current_user)
          .for_each(|(_, peer)| {
          writeln!(peer, "{} t-> {}", current_user, line);
          })
          });

          //discard user
          connections.write().unwrap().remove(&mut nick);
          });

          Ok(())
          })
          }





          share|improve this answer










          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.


















          • I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
            – francesc
            17 hours ago















          up vote
          0
          down vote










          up vote
          0
          down vote









          I made some changes to your code (see below). You can check the code for what I've done so far: I added some comments on it so you can compare it with your version. I'll note some important points in here. Please ask if you have any questions; I will try to explain or we can improve the code together.




          • Propagating error to the main function from a spawned thread is not possible. It's only possible after joined to the main thread (you can use join handler then propagate your anonymous function's return value to the main).

          • Created ConnectionsSync struct as helper to get rid of Arc<RwLock<HashMap< initialization: it was looking too messy; also with this way you can add behaviors like (add_user, broadcast_message, remove_user, ... )


          • Incoming has Iterator implementation so I can use it as iterator and I used try_for_each to tell that I am going to apply fallible function, then it will be propagated to main.


          Code



          use std::collections::HashMap;
          use std::env;
          use std::io;
          use std::io::{BufRead, BufReader, LineWriter, Write};
          use std::net::{TcpListener, TcpStream};
          use std::sync::{Arc, RwLock};
          use std::thread;

          struct ConnectionsSync(Arc<RwLock<HashMap<String, LineWriter<TcpStream>>>>);
          impl ConnectionsSync {
          fn new() -> Self {
          ConnectionsSync(Arc::new(RwLock::new(HashMap::new())))
          }
          }

          fn main() -> io::Result<()> {
          let port = env::args().nth(1).unwrap_or("8080".to_string()); //default port 8080
          let connections = ConnectionsSync::new();

          TcpListener::bind(&format!("localhost:{}", port))?
          .incoming()
          .try_for_each(|socket| -> io::Result<()> {
          let socket = socket?;
          let connections = connections.0.clone(); //increases ref count

          thread::spawn(move || {
          let (mut reader, writer) = socket
          .try_clone()
          .map(|socket1| (BufReader::new(socket1), LineWriter::new(socket)))
          .unwrap();

          // read nick
          let mut nick = String::new();
          reader
          .read_line(&mut nick)
          .and_then(|_| Ok(nick.pop()))
          .expect("Not able to read user nick");
          let current_user = nick.clone();

          //add user
          connections
          .write()
          .unwrap()
          .insert(current_user.clone(), writer);

          //read user input
          reader.lines().into_iter().for_each(|line| {
          let line = line.expect("Not able to read user input");
          connections
          .write()
          .unwrap()
          .iter_mut()
          .filter(|(other_user, _)| **other_user != current_user)
          .for_each(|(_, peer)| {
          writeln!(peer, "{} t-> {}", current_user, line);
          })
          });

          //discard user
          connections.write().unwrap().remove(&mut nick);
          });

          Ok(())
          })
          }





          share|improve this answer










          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.









          I made some changes to your code (see below). You can check the code for what I've done so far: I added some comments on it so you can compare it with your version. I'll note some important points in here. Please ask if you have any questions; I will try to explain or we can improve the code together.




          • Propagating error to the main function from a spawned thread is not possible. It's only possible after joined to the main thread (you can use join handler then propagate your anonymous function's return value to the main).

          • Created ConnectionsSync struct as helper to get rid of Arc<RwLock<HashMap< initialization: it was looking too messy; also with this way you can add behaviors like (add_user, broadcast_message, remove_user, ... )


          • Incoming has Iterator implementation so I can use it as iterator and I used try_for_each to tell that I am going to apply fallible function, then it will be propagated to main.


          Code



          use std::collections::HashMap;
          use std::env;
          use std::io;
          use std::io::{BufRead, BufReader, LineWriter, Write};
          use std::net::{TcpListener, TcpStream};
          use std::sync::{Arc, RwLock};
          use std::thread;

          struct ConnectionsSync(Arc<RwLock<HashMap<String, LineWriter<TcpStream>>>>);
          impl ConnectionsSync {
          fn new() -> Self {
          ConnectionsSync(Arc::new(RwLock::new(HashMap::new())))
          }
          }

          fn main() -> io::Result<()> {
          let port = env::args().nth(1).unwrap_or("8080".to_string()); //default port 8080
          let connections = ConnectionsSync::new();

          TcpListener::bind(&format!("localhost:{}", port))?
          .incoming()
          .try_for_each(|socket| -> io::Result<()> {
          let socket = socket?;
          let connections = connections.0.clone(); //increases ref count

          thread::spawn(move || {
          let (mut reader, writer) = socket
          .try_clone()
          .map(|socket1| (BufReader::new(socket1), LineWriter::new(socket)))
          .unwrap();

          // read nick
          let mut nick = String::new();
          reader
          .read_line(&mut nick)
          .and_then(|_| Ok(nick.pop()))
          .expect("Not able to read user nick");
          let current_user = nick.clone();

          //add user
          connections
          .write()
          .unwrap()
          .insert(current_user.clone(), writer);

          //read user input
          reader.lines().into_iter().for_each(|line| {
          let line = line.expect("Not able to read user input");
          connections
          .write()
          .unwrap()
          .iter_mut()
          .filter(|(other_user, _)| **other_user != current_user)
          .for_each(|(_, peer)| {
          writeln!(peer, "{} t-> {}", current_user, line);
          })
          });

          //discard user
          connections.write().unwrap().remove(&mut nick);
          });

          Ok(())
          })
          }






          share|improve this answer










          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.









          share|improve this answer



          share|improve this answer








          edited 19 hours ago









          Toby Speight

          23.1k538110




          23.1k538110






          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.









          answered 20 hours ago









          Ömer Erden

          101




          101




          New contributor




          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.





          New contributor





          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.






          Ömer Erden is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
          Check out our Code of Conduct.












          • I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
            – francesc
            17 hours ago




















          • I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
            – francesc
            17 hours ago


















          I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
          – francesc
          17 hours ago






          I see...a functional aproach. Some questions: 1.- I guess scoped threads (crossbeam) would get rid of Arc. 2.- Can we get rid of nick.clone() and keep a simple reference? I guess not because cannot move out of borrowed content despite nick being immutable. 3.- can we filter on value references instead of key objects? Thanks for the review, much clearer code
          – francesc
          17 hours ago




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Code Review Stack Exchange!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          Use MathJax to format equations. MathJax reference.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f209016%2fsimple-multithreaded-chat-server-in-rust%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Morgemoulin

          Scott Moir

          Souastre