Rust
Module reorganization
We reorganized the module tree, so import paths are not the same as before. The main difference is that everything should be imported via the root path zenoh::
. Here are some examples, but you can look into zenoh/src/lib.rs
for the complete list of changes.
// common use
use zenoh::config::*;
use zenoh::{Config, Error, Result};
// key_expr & selector
use zenoh::key_expr::{
format::{kedefine, keformat},
keyexpr, KeyExpr, OwnedKeyExpr,
};
// session
use zenoh::session::{init, open, EntityId, Session, SessionInfo};
// publisher & subscriber
use zenoh::pubsub::{Publisher, Reliability, Subscriber};
// query & queryable & selectors
use zenoh::query::{
ConsolidationMode, Parameters, Query, QueryConsolidation, QueryTarget, Queryable, Reply,
Selector,
};
// ZBytes & encoding
use zenoh::bytes::{ZBytes, Encoding};
// sample
use zenoh::sample::{Locality, Sample};
// quality of service
use zenoh::qos::{CongestionControl, Priority, QoSBuilderTrait};
Removal of the sync and async preludes
Zenoh preludes has been deprecated and are no more used in the API. The API has also been made asynchronous first: all operations like put/get/etc. can be awaited directly.
Making synchronous calls now requires to import zenoh::Wait
, and use wait()
method, replacing the old res()
method.
To make the migration easier, there is a deprecation prompt if you use the old API convention.
// (deprecated) async
use zenoh::prelude::r#async::*;
let session = zenoh::open(config).res().await.unwrap();
let publisher = session.declare_publisher(&key_expr).res().await.unwrap();
put.res().await.unwrap();
// (deprecated) sync
use zenoh::prelude::sync::*;
let session = zenoh::open(config).res().unwrap();
let publisher = session.declare_publisher(&key_expr).res().unwrap();
put.res().unwrap();
- Zenoh 1.0.0
// async
// Difference 1: No more res()
let session = zenoh::open(config).await.unwrap();
let publisher = session.declare_publisher(&key_expr).await.unwrap();
publisher.put(buf).await.unwrap();
// sync
// Difference 2: use wait() for synchronous API
use zenoh::Wait;
let session = zenoh::open(config).wait().unwrap();
let publisher = session.declare_publisher(&key_expr).wait().unwrap();
publisher.put(buf).wait().unwrap();
Session
is now clonable and can be closed easily
Session
implements Clone
now, so there is no more need to wrap it into an Arc<Session>
, and Session::into_arc
has been deprecated. All the session methods, except Session::close
, works like before, so only the session type need to be changed.
As a side effect, Subscriber
and Queryable
no longer have a generic lifetime parameter. Publisher
also looses one of its lifetime parameters, to keep only the one of its key expression.
The session is now closed automatically when the last Session
instance is dropped, even if publishers/subscribers/etc. are still alive. Session can also be manually closed using Session::close
, which now takes an immutable reference, so it can be called anytime, even if publishers/subscribers/etc. are still alive.
Subscriber and queryable of a closed session will no longer receive data; trying to call Session::get
, Session::put
or Publisher::put
will result in an error. Closing session on the fly may save bandwidth on the wire, as it avoids propagating the undeclaration of remaining entities like subscribers/queryables/etc.
let session = zenoh::open(zenoh::config::peer()).await.unwrap();
let subscriber = session
.declare_subscriber("key/expression")
.await
.unwrap();
let subscriber_task = tokio::spawn(async move {
while let Ok(sample) = subscriber.recv_async().await {
println!("Received: {} {:?}", sample.key_expr(), sample.payload());
}
});
// session can be closed while subscriber is still running, preventing it
// receiving more data
session.close().await.unwrap();
// subscriber task will end as `subscriber.recv_async()` will return `Err`
// **when all remaining data has been processed**.
// subscriber undeclaration has not been sent on the wire
subscriber_task.await.unwrap()
Callbacks run in background until session is closed
Session entities, e.g. subscribers, declared with callbacks are no longer undeclared when they are dropped; there is no longer need to keep a reference to an entity when the intent is to have it run until the session is closed.
let session = zenoh::open(zenoh::config::default()).await.unwrap();
session
.declare_subscriber("key/ expression")
.callback(|sample| { println!("Received: {} {:?}", sample. key_expr(), sample. payload()) })
.await
.unwrap();
// subscriber run in background until the session is closed
// no need to keep a variable around
If you still want the entity to be undeclared when dropped, you can simply use with
instead of callback
; it may just require you to annotate the callback, as type inference is not as good as with callback
method.
let session = zenoh::open(zenoh::config::default()).await.unwrap();
let subscriber = session
.declare_subscriber("key/ expression")
// annotation needed
.with(|sample: Sample| { println!("Received: {} {:?}", sample. key_expr(), sample. payload()) })
.await
.unwrap();
// subscriber is undeclared when dropped
Going into details, a new method undeclare_on_drop(bool)
– default to true
, has been added to the builders, and callback(cb)
is now simply a shortcut to with(cb).undeclare_on_drop(false)
.
However, the normal user would rarely need to call this method directly.
Value is gone, long live ZBytes
Value
has been split into ZBytes
and Encoding
. put
and other operations now require a ZBytes
payload, and builders accept an optional Encoding
parameter. The encoding is no longer automatically deduced from the payload type.
ZBytes
is a raw bytes container, which can also contain non-contiguous regions of memory. It can be created directly from raw bytes/strings using ZBytes::from
. The bytes can be retrieved using ZBytes::to_bytes
, which returns a Cow<[u8]>
, as a copy may have to be done if the underlying bytes are not contiguous.
- Zenoh 0.11.x
let sample = subscriber.recv_async().await.unwrap();
let value: Value = sample.value;
let raw_bytes: Vec<u8> = value.try_into().unwrap();
- Zenoh 1.0.0
let sample = subscriber.recv_async().await.unwrap();
let zbytes: ZBytes = sample.payload();
let raw_bytes: Cow<[u8]> = zbytes.as_bytes();
You can look at a full set of examples in examples/examples/z_bytes.rs
.
Serialization
Zenoh does provide serialization for convenience as an extension in the zenoh-ext
crate. Serialization is implemented for a bunch of standard types like integers, floats, Vec
, HashMap
, etc. and is used through functions z_serialize
/z_deserialize
.
let input: Vec<f32> = vec![0.0, 1.5, 42.0];
let payload: ZBytes = z_serialize(&input);
let output: Vec<f32> = z_deserialize(&payload).unwrap();
zenoh-ext
serialization doesn’t pretend to cover all use cases, as it is just one available choice among other serialization formats like JSON, Protobuf, CBOR, etc. In the end, Zenoh will just send and receive payload raw bytes independently of the serialization used.
NOTE: ⚠️ Serialization of Vec<u8>
is not the same as creating a ZBytes
from a Vec<u8>
: the resulting ZBytes
are different, and serialization doesn’t take ownership of the bytes.
Encoding
Encoding
has been reworked.
Zenoh does not impose any encoding requirement on the user, nor does it operate on it.
It can be thought of as optional metadata, carried over by Zenoh in such a way that the end user’s application may perform different operations based on encoding.
We have expanded our list of pre-defined encoding types from Zenoh 0.11.0 for user convenience.
The module path and name of the encoding have also changed.
- Zenoh 0.11.x
use zenoh::prelude::KnownEncoding;
session
.put(&key_expr, payload)
.encoding(KnownEncoding::AppOctetStream)
.res()
.await
.unwrap();
- Zenoh 1.0.0
use zenoh::encoding::Encoding;
session
.put(&key_expr, payload)
.encoding(Encoding::APPLICATION_OCTET_STREAM)
.await
.unwrap();
Users can also define their own encoding scheme that does not need to be based on the pre-defined variants.
let encoding = Encoding::from("pointcloud/LAS");
Because encoding is now optional for put
, Publisher
can be declared with a default encoding, which will be used in every Publisher::put
.
let publisher = session.declare_publisher("my/keyepxr").encoding(Encoding::APPLICATION_JSON).await.unwrap();
// default encoding from publisher `application/json`
publisher.put(serde_json::to_vec(json!({"key", "value"})).unwrap()).await.unwrap();
Attachment
In Zenoh 0.11.x, the AttachmentBuilder
was required to create an attachment.
In Zenoh 1.0.0, we have removed AttachmentBuilder
, and an attachment can be created from anything that implements Into<ZBytes>
- Zenoh 0.11.x
let mut attachment = AttachmentBuilder::new();
attachment.insert("key1", "value1");
attachment.insert("key2", "value2");
publisher.put(payload)
.with_attachment(attachment.build())
.res()
.await
.unwrap();
- Zenoh 1.0.0
// Difference 1: No AttachmentBuilder anymore
// Accept any type which can be transformed into ZBytes
let mut hashmap = HashMap::new();
hashmap.insert(String::from("key1"), String::from("value1"));
hashmap.insert(String::from("key2"), String::from("value2"));
let the_attachment = ZBytes::from(&hashmap);
// Difference 2: no with_attachment()
publisher
.put(payload)
.attachment(the_attachment)
.await
.unwrap();
API changes in Query & Queryable
Query and Queryable have been slightly reworked.
For the API replying to a Query
from a Queryable
declared on a session:
The reply function has been split into 3 separate functions variants depending on the type of reply the user wants to send.
- Zenoh 0.11.x
let reply_ok = Ok(Sample::new(key_expr.clone(), payload.clone())); // Success
query.reply(reply_ok).res().await.unwrap();
// or
let reply_err = Err(Value::from(payload.clone())); // Failure
query.reply(reply_err).res().await.unwrap();
- Zenoh 1.0.0
// No need to send Result
// For sending Succesful Reply to Query
query.reply(key_expr.clone(), payload.clone()).await.unwrap(); // Success
// For sending Error Reply to Query
query.reply_err(payload.clone()).await.unwrap(); // Failure
// For sending Delete reply to Query (Sample Kind = Delete)
query.reply_del(payload.clone()).await.unwrap(); // Delete (Success)
For how a Get Query
receives the reply:
use result()
on the Reply
to get the &Sample
,
or into_result
to take the ownership of the Sample
.
Ok
variant replies, will return Sample
.
Err
variant replies, will return ReplyError
- Zenoh 0.11.x
while let Ok(reply) = replies.recv_async().await {
match reply.sample { // sample should be Result<Sample, Value>
Ok(sample) => println!(
">> Received ('{}': '{}')",
sample.key_expr.as_str(),
sample.value,
),
Err(value_err) => println!("{}", String::try_from(&value_err).unwrap()),
}
}
- Zenoh 1.0.0
while let Ok(reply) = replies.recv_async().await {
// Difference 1: using result() to get Result<&Sample, &ReplyError>
match reply.result() {
Ok(sample) => {
println!(
">> Received ('{}': '{}')",
sample.key_expr().as_str(),
// Difference 2: payload() instead of value
sample.payload().deserialize::<String>().unwrap()
);
}
// Difference 3: ReplyError instead of Value
Err(err) => {
println!("{}", err.payload().deserialize::<String>().unwrap());
}
}
}
We have also added the ability to get underlying Handlers from Queryables
, so that users have direct acces to the receiver of the data channel.
let queryable = session
.declare_queryable(&key_expr)
.await
.unwrap();
let handler: &Receiver<Query> = queryable.handler();
// or mutable handler
let mut_handler:&mut Receiver<Query> = queryable.handler_mut();
Use accessors to get private members
We encapsulate members of structs, and they can’t be accessed directly now. The only way to access Struct values is to use the getter function associated with them. Let’s take the subscriber as an example here.
- Zenoh 0.11.x
while let Ok(sample) = subscriber.recv_async().await {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
}
- Zenoh 1.0.0
while let Ok(sample) = subscriber.recv_async().await {
println!(
">> [Subscriber] Received {} ('{}': '{:?}')",
sample.kind(),
sample.key_expr().as_str(),
sample.payload() // Ignore the deserialization
);
}
Support RingChannel to receive data
Besides using a callback to receive data, we can also receive the data from a default FIFO channel. However, sometimes we only care about the latest data and want to discard the oldest data.
We can use RingChannel
to get this behaviour.
You can take a look at the complete code in examples/examples/z_pull.rs
.
let subscriber = session
.declare_subscriber(&key_expr)
.with(RingChannel::new(size))
.await
.unwrap();
⚠️ Note: We no longer support Pull mode in Zenoh
To get the same behavior of a Zenoh 0.11.0 PullSubscriber
, please make use of a RingChannel
an example of this is illustrated in z_pull.rs
.
Timestamps
We now tie generating a timestamp to a Zenoh session, with the timestamp inheriting the ZenohID
of the session.
Note that a Zenoh session will only be able to generate a timestamp if the timestamping
configuration option is enabled.
- Zenoh 0.11.x
let timestamp : Timestamp = zenoh::time::new_reception_timestamp();
- Zenoh 1.0.0
let session: Session = zenoh::open();
// If the `timestamping` configuration is disabled, this call will return `None`.
let timestamp: Option<Timestamp> = session::new_timestamp();
This will affect user-created plugins and applications that need to generate timestamps.
Feature Flags
Removed:
complete_n
: due to a Legacy code cleanup
Storages
Zenoh 1.0.0 introduced the possibility for Zenoh nodes configured in a mode other than router
to load plugins.
A, somehow, implicit assumption that dictated the behaviour of storages is that the Zenoh node loading them has to add a timestamp to any received publication that did not have one. This functionality is controlled by the timestamping
configuration option.
Until Zenoh 1.0.0 this assumption held true as only a router could load storage and the default configuration for a router enables timestamping
. However, in Zenoh 1.0.0 nodes configured in client
& peer
mode can load storage and their default configuration disables timestamping
.
⚠️ The storage-manager
will fail to launch if the timestamping
configuration option is disabled.