Browse Source

Add a WrappedConnectionManager for custom Socket implementations

main 0.3.1
Alex Feldman-Crough 2 weeks ago
parent
commit
d2984d1d3c
  1. 2
      Cargo.toml
  2. 2
      src/lib.rs
  3. 6
      src/socket/mod.rs
  4. 62
      src/socket/pool.rs
  5. 6
      src/socket/recv.rs

2
Cargo.toml

@ -1,6 +1,6 @@
[package]
name = "tokio-zeromq"
version = "0.3.0"
version = "0.3.1"
authors = ["Alex Feldman-Crough <alex@fldcr.com>"]
edition = "2018"
license = "MPL-2.0"

2
src/lib.rs

@ -7,7 +7,7 @@ pub use self::socket::Socket;
pub use zmq::Context;
pub use zmq::SocketType;
#[cfg(feature="pool")]
#[cfg(feature = "pool")]
pub use self::socket::ConnectionManager;
pub mod error;

6
src/socket/mod.rs

@ -1,14 +1,14 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
pub use socket_impl::Socket;
#[cfg(feature="pool")]
#[cfg(feature = "pool")]
pub use pool::ConnectionManager;
pub use socket_impl::Socket;
pub mod recv;
pub mod recv_msg;
pub mod send;
mod socket_impl;
#[cfg(feature="pool")]
#[cfg(feature = "pool")]
mod pool;

62
src/socket/pool.rs

@ -1,9 +1,11 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use crate::error::Error;
use super::Socket;
use async_trait::async_trait;
use std::fmt::Debug;
use super::Socket;
use crate::error::Error;
pub struct ConnectionManager {
address: String,
@ -12,18 +14,38 @@ pub struct ConnectionManager {
}
impl ConnectionManager {
pub fn new(context: zmq::Context, socket_type: zmq::SocketType, address: impl Into<String>) -> Self {
pub fn new(
context: zmq::Context,
socket_type: zmq::SocketType,
address: impl Into<String>,
) -> Self {
let address = address.into();
ConnectionManager { context, socket_type, address }
ConnectionManager {
context,
socket_type,
address,
}
}
pub fn wrap<T, E, F>(self, initialize: F) -> WrappedConnectionManager<T, E, F>
where
E: From<Error>,
F: Fn(Socket) -> Result<T, E>,
{
WrappedConnectionManager {
connection_manager: self,
initialize,
}
}
}
pub struct WrappedConnectionManager<T, E>
pub struct WrappedConnectionManager<T, E, F = fn(Socket) -> Result<T, E>>
where
Error: Into<E>,
F: Fn(Socket) -> Result<T, E>,
E: From<Error>,
{
connection_manager: ConnectionManager<T>,
initialize: Socket -> Result<T, E>,
connection_manager: ConnectionManager,
initialize: F,
}
#[async_trait]
@ -47,3 +69,27 @@ impl bb8::ManageConnection for ConnectionManager {
false
}
}
#[async_trait]
impl<T, E, F> bb8::ManageConnection for WrappedConnectionManager<T, E, F>
where
E: 'static + Debug + From<Error> + Send,
T: 'static + Send,
F: 'static + Fn(Socket) -> Result<T, E> + Send + Sync,
{
type Connection = T;
type Error = E;
async fn connect(&self) -> Result<T, E> {
let socket = self.connection_manager.connect().await?;
(self.initialize)(socket)
}
async fn is_valid(&self, _: &mut bb8::PooledConnection<'_, Self>) -> Result<(), E> {
Ok(())
}
fn has_broken(&self, _: &mut T) -> bool {
false
}
}

6
src/socket/recv.rs

@ -19,7 +19,11 @@ pub struct RecvFuture<'a, 'b> {
}
impl<'a, 'b> RecvFuture<'a, 'b> {
pub(super) fn new(socket: &'a mut zmq::Socket, async_fd: &'a mut AsyncFd<RawFd>, message: &'b mut Message) -> Self {
pub(super) fn new(
socket: &'a mut zmq::Socket,
async_fd: &'a mut AsyncFd<RawFd>,
message: &'b mut Message,
) -> Self {
RecvFuture {
socket,
async_fd,

Loading…
Cancel
Save