Source code

Revision control

Copy as Markdown

Other Tools

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::collections::{HashMap, HashSet, VecDeque};
use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace, Header};
use neqo_transport::{Connection, Error as TransportError, StreamId};
use crate::{
decoder_instructions::{DecoderInstruction, DecoderInstructionReader},
encoder_instructions::EncoderInstruction,
header_block::HeaderEncoder,
qlog,
qpack_send_buf::QpackData,
reader::ReceiverConnWrapper,
stats::Stats,
table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE},
Error, QpackSettings, Res,
};
pub const QPACK_UNI_STREAM_TYPE_ENCODER: u64 = 0x2;
#[derive(Debug, PartialEq)]
enum LocalStreamState {
NoStream,
Uninitialized(StreamId),
Initialized(StreamId),
}
impl LocalStreamState {
pub fn stream_id(&self) -> Option<StreamId> {
match self {
Self::NoStream => None,
Self::Uninitialized(stream_id) | Self::Initialized(stream_id) => Some(*stream_id),
}
}
}
#[derive(Debug)]
pub struct QPackEncoder {
table: HeaderTable,
max_table_size: u64,
max_entries: u64,
instruction_reader: DecoderInstructionReader,
local_stream: LocalStreamState,
max_blocked_streams: u16,
// Remember header blocks that are referring to dynamic table.
// There can be multiple header blocks in one stream, headers, trailer, push stream request,
// etc. This HashMap maps a stream ID to a list of header blocks. Each header block is a
// list of referenced dynamic table entries.
unacked_header_blocks: HashMap<StreamId, VecDeque<HashSet<u64>>>,
blocked_stream_cnt: u16,
use_huffman: bool,
next_capacity: Option<u64>,
stats: Stats,
}
impl QPackEncoder {
#[must_use]
pub fn new(qpack_settings: &QpackSettings, use_huffman: bool) -> Self {
Self {
table: HeaderTable::new(true),
max_table_size: qpack_settings.max_table_size_encoder,
max_entries: 0,
instruction_reader: DecoderInstructionReader::new(),
local_stream: LocalStreamState::NoStream,
max_blocked_streams: 0,
unacked_header_blocks: HashMap::new(),
blocked_stream_cnt: 0,
use_huffman,
next_capacity: None,
stats: Stats::default(),
}
}
/// This function is use for setting encoders table max capacity. The value is received as
/// a `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting parameter.
///
/// # Errors
///
/// `EncoderStream` if value is too big.
/// `ChangeCapacity` if table capacity cannot be reduced.
pub fn set_max_capacity(&mut self, cap: u64) -> Res<()> {
if cap > (1 << 30) - 1 {
return Err(Error::EncoderStream);
}
if cap == self.table.capacity() {
return Ok(());
}
qdebug!(
[self],
"Set max capacity to new capacity:{} old:{} max_table_size={}.",
cap,
self.table.capacity(),
self.max_table_size,
);
let new_cap = std::cmp::min(self.max_table_size, cap);
// we also set our table to the max allowed.
self.change_capacity(new_cap);
Ok(())
}
/// This function is use for setting encoders max blocked streams. The value is received as
/// a `SETTINGS_QPACK_BLOCKED_STREAMS` setting parameter.
///
/// # Errors
///
/// `EncoderStream` if value is too big.
pub fn set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()> {
self.max_blocked_streams = u16::try_from(blocked_streams).or(Err(Error::EncoderStream))?;
Ok(())
}
/// Reads decoder instructions.
///
/// # Errors
///
/// May return: `ClosedCriticalStream` if stream has been closed or `DecoderStream`
/// in case of any other transport error.
pub fn receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
self.read_instructions(conn, stream_id)
.map_err(|e| map_error(&e))
}
fn read_instructions(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
qdebug!([self], "read a new instraction");
loop {
let mut recv = ReceiverConnWrapper::new(conn, stream_id);
match self.instruction_reader.read_instructions(&mut recv) {
Ok(instruction) => self.call_instruction(instruction, conn.qlog_mut())?,
Err(Error::NeedMoreData) => break Ok(()),
Err(e) => break Err(e),
}
}
}
fn recalculate_blocked_streams(&mut self) {
let acked_inserts_cnt = self.table.get_acked_inserts_cnt();
self.blocked_stream_cnt = 0;
for hb_list in self.unacked_header_blocks.values_mut() {
debug_assert!(!hb_list.is_empty());
if hb_list.iter().flatten().any(|e| *e >= acked_inserts_cnt) {
self.blocked_stream_cnt += 1;
}
}
}
#[allow(clippy::map_err_ignore)]
fn insert_count_instruction(&mut self, increment: u64) -> Res<()> {
self.table
.increment_acked(increment)
.map_err(|_| Error::DecoderStream)?;
self.recalculate_blocked_streams();
Ok(())
}
fn header_ack(&mut self, stream_id: StreamId) {
self.stats.header_acks_recv += 1;
let mut new_acked = self.table.get_acked_inserts_cnt();
if let Some(hb_list) = self.unacked_header_blocks.get_mut(&stream_id) {
if let Some(ref_list) = hb_list.pop_back() {
for iter in ref_list {
self.table.remove_ref(iter);
if iter >= new_acked {
new_acked = iter + 1;
}
}
} else {
debug_assert!(false, "We should have at least one header block.");
}
if hb_list.is_empty() {
self.unacked_header_blocks.remove(&stream_id);
}
}
if new_acked > self.table.get_acked_inserts_cnt() {
self.insert_count_instruction(new_acked - self.table.get_acked_inserts_cnt())
.expect("This should neve happen");
}
}
fn stream_cancellation(&mut self, stream_id: StreamId) {
self.stats.stream_cancelled_recv += 1;
let mut was_blocker = false;
if let Some(mut hb_list) = self.unacked_header_blocks.remove(&stream_id) {
debug_assert!(!hb_list.is_empty());
while let Some(ref_list) = hb_list.pop_front() {
for iter in ref_list {
self.table.remove_ref(iter);
was_blocker = was_blocker || (iter >= self.table.get_acked_inserts_cnt());
}
}
}
if was_blocker {
debug_assert!(self.blocked_stream_cnt > 0);
self.blocked_stream_cnt -= 1;
}
}
fn call_instruction(
&mut self,
instruction: DecoderInstruction,
qlog: &mut NeqoQlog,
) -> Res<()> {
qdebug!([self], "call intruction {:?}", instruction);
match instruction {
DecoderInstruction::InsertCountIncrement { increment } => {
qlog::qpack_read_insert_count_increment_instruction(
qlog,
increment,
&increment.to_be_bytes(),
);
self.insert_count_instruction(increment)
}
DecoderInstruction::HeaderAck { stream_id } => {
self.header_ack(stream_id);
Ok(())
}
DecoderInstruction::StreamCancellation { stream_id } => {
self.stream_cancellation(stream_id);
Ok(())
}
DecoderInstruction::NoInstruction => Ok(()),
}
}
/// Inserts a new entry into a table and sends the corresponding instruction to a peer. An entry
/// is added only if it is possible to send the corresponding instruction immediately, i.e.
/// the encoder stream is not blocked by the flow control (or stream internal buffer(this is
/// very unlikely)).
///
/// # Errors
///
/// `EncoderStreamBlocked` if the encoder stream is blocked by the flow control.
/// `DynamicTableFull` if the dynamic table does not have enough space for the entry.
/// The function can return transport errors: `InvalidStreamId`, `InvalidInput` and
/// `FinalSizeError`.
///
/// # Panics
///
/// When the insertion fails (it should not).
pub fn send_and_insert(
&mut self,
conn: &mut Connection,
name: &[u8],
value: &[u8],
) -> Res<u64> {
qdebug!([self], "insert {:?} {:?}.", name, value);
let entry_size = name.len() + value.len() + ADDITIONAL_TABLE_ENTRY_SIZE;
if !self.table.insert_possible(entry_size) {
return Err(Error::DynamicTableFull);
}
let mut buf = QpackData::default();
EncoderInstruction::InsertWithNameLiteral { name, value }
.marshal(&mut buf, self.use_huffman);
let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;
let sent = conn
.stream_send_atomic(stream_id, &buf)
.map_err(|e| map_stream_send_atomic_error(&e))?;
if !sent {
return Err(Error::EncoderStreamBlocked);
}
self.stats.dynamic_table_inserts += 1;
match self.table.insert(name, value) {
Ok(inx) => Ok(inx),
Err(e) => {
debug_assert!(false);
Err(e)
}
}
}
fn change_capacity(&mut self, value: u64) {
qdebug!([self], "change capacity: {}", value);
self.next_capacity = Some(value);
}
fn maybe_send_change_capacity(
&mut self,
conn: &mut Connection,
stream_id: StreamId,
) -> Res<()> {
if let Some(cap) = self.next_capacity {
// Check if it is possible to reduce the capacity, e.g. if enough space can be make free
// for the reduction.
if cap < self.table.capacity() && !self.table.can_evict_to(cap) {
return Err(Error::DynamicTableFull);
}
let mut buf = QpackData::default();
EncoderInstruction::Capacity { value: cap }.marshal(&mut buf, self.use_huffman);
if !conn.stream_send_atomic(stream_id, &buf)? {
return Err(Error::EncoderStreamBlocked);
}
if self.table.set_capacity(cap).is_err() {
debug_assert!(
false,
"can_evict_to should have checked and make sure this operation is possible"
);
return Err(Error::InternalError);
}
self.max_entries = cap / 32;
self.next_capacity = None;
}
Ok(())
}
/// Sends any qpack encoder instructions.
///
/// # Errors
///
/// returns `EncoderStream` in case of an error.
pub fn send_encoder_updates(&mut self, conn: &mut Connection) -> Res<()> {
match self.local_stream {
LocalStreamState::NoStream => {
qerror!("Send call but there is no stream yet.");
Ok(())
}
LocalStreamState::Uninitialized(stream_id) => {
let mut buf = QpackData::default();
buf.encode_varint(QPACK_UNI_STREAM_TYPE_ENCODER);
if !conn.stream_send_atomic(stream_id, &buf[..])? {
return Err(Error::EncoderStreamBlocked);
}
self.local_stream = LocalStreamState::Initialized(stream_id);
self.maybe_send_change_capacity(conn, stream_id)
}
LocalStreamState::Initialized(stream_id) => {
self.maybe_send_change_capacity(conn, stream_id)
}
}
}
fn is_stream_blocker(&self, stream_id: StreamId) -> bool {
if let Some(hb_list) = self.unacked_header_blocks.get(&stream_id) {
debug_assert!(!hb_list.is_empty());
match hb_list.iter().flatten().max() {
Some(max_ref) => *max_ref >= self.table.get_acked_inserts_cnt(),
None => false,
}
} else {
false
}
}
/// Encodes headers
///
/// # Errors
///
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
///
/// # Panics
///
/// If there is a programming error.
pub fn encode_header_block(
&mut self,
conn: &mut Connection,
h: &[Header],
stream_id: StreamId,
) -> HeaderEncoder {
qdebug!([self], "encoding headers.");
let mut encoder_blocked = false;
// Try to send capacity instructions if present.
if self.send_encoder_updates(conn).is_err() {
// This code doesn't try to deal with errors, it just tries
// to write to the encoder stream AND if it can't uses
// literal instructions.
// The errors can be:
// 1) `EncoderStreamBlocked` - this is an error that can occur.
// 2) `InternalError` - this is unexpected error.
// 3) `ClosedCriticalStream` - this is error that should close the HTTP/3 session.
// The last 2 errors are ignored here and will be picked up
// by the main loop.
encoder_blocked = true;
}
let mut encoded_h =
HeaderEncoder::new(self.table.base(), self.use_huffman, self.max_entries);
let stream_is_blocker = self.is_stream_blocker(stream_id);
let can_block = self.blocked_stream_cnt < self.max_blocked_streams || stream_is_blocker;
let mut ref_entries = HashSet::new();
for iter in h {
let name = iter.name().as_bytes().to_vec();
let value = iter.value().as_bytes().to_vec();
qtrace!("encoding {:x?} {:x?}.", name, value);
if let Some(LookupResult {
index,
static_table,
value_matches,
}) = self.table.lookup(&name, &value, can_block)
{
qtrace!(
[self],
"found a {} entry, value-match={}",
if static_table { "static" } else { "dynamic" },
value_matches
);
if value_matches {
if static_table {
encoded_h.encode_indexed_static(index);
} else {
encoded_h.encode_indexed_dynamic(index);
}
} else {
encoded_h.encode_literal_with_name_ref(static_table, index, &value);
}
if !static_table && ref_entries.insert(index) {
self.table.add_ref(index);
}
} else if can_block && !encoder_blocked {
// Insert using an InsertWithNameLiteral instruction. This entry name does not match
// any name in the tables therefore we cannot use any other
// instruction.
if let Ok(index) = self.send_and_insert(conn, &name, &value) {
encoded_h.encode_indexed_dynamic(index);
ref_entries.insert(index);
self.table.add_ref(index);
} else {
// This code doesn't try to deal with errors, it just tries
// to write to the encoder stream AND if it can't uses
// literal instructions.
// The errors can be:
// 1) `EncoderStreamBlocked` - this is an error that can occur.
// 2) `DynamicTableFull` - this is an error that can occur.
// 3) `InternalError` - this is unexpected error.
// 4) `ClosedCriticalStream` - this is error that should close the HTTP/3
// session.
// The last 2 errors are ignored here and will be picked up
// by the main loop.
// As soon as one of the instructions cannot be written or the table is full, do
// not try again.
encoder_blocked = true;
encoded_h.encode_literal_with_name_literal(&name, &value);
}
} else {
encoded_h.encode_literal_with_name_literal(&name, &value);
}
}
encoded_h.encode_header_block_prefix();
if !stream_is_blocker {
// The streams was not a blocker, check if the stream is a blocker now.
if let Some(max_ref) = ref_entries.iter().max() {
if *max_ref >= self.table.get_acked_inserts_cnt() {
debug_assert!(self.blocked_stream_cnt <= self.max_blocked_streams);
self.blocked_stream_cnt += 1;
}
}
}
if !ref_entries.is_empty() {
self.unacked_header_blocks
.entry(stream_id)
.or_default()
.push_front(ref_entries);
self.stats.dynamic_table_references += 1;
}
encoded_h
}
/// Encoder stream has been created. Add the stream id.
///
/// # Panics
///
/// If a stream has already been added.
pub fn add_send_stream(&mut self, stream_id: StreamId) {
if self.local_stream == LocalStreamState::NoStream {
self.local_stream = LocalStreamState::Uninitialized(stream_id);
} else {
panic!("Adding multiple local streams");
}
}
#[must_use]
pub fn stats(&self) -> Stats {
self.stats.clone()
}
#[must_use]
pub fn local_stream_id(&self) -> Option<StreamId> {
self.local_stream.stream_id()
}
#[cfg(test)]
fn blocked_stream_cnt(&self) -> u16 {
self.blocked_stream_cnt
}
}
impl ::std::fmt::Display for QPackEncoder {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "QPackEncoder")
}
}
fn map_error(err: &Error) -> Error {
if *err == Error::ClosedCriticalStream {
Error::ClosedCriticalStream
} else {
Error::DecoderStream
}
}
fn map_stream_send_atomic_error(err: &TransportError) -> Error {
match err {
TransportError::InvalidStreamId | TransportError::FinalSizeError => {
Error::ClosedCriticalStream
}
_ => {
debug_assert!(false, "Unexpected error");
Error::InternalError
}
}
}
#[cfg(test)]
mod tests {
use std::mem;
use neqo_transport::{ConnectionParameters, StreamId, StreamType};
use test_fixture::{default_client, default_server, handshake, new_server, now, DEFAULT_ALPN};
use super::{Connection, Error, Header, QPackEncoder, Res};
use crate::QpackSettings;
struct TestEncoder {
encoder: QPackEncoder,
send_stream_id: StreamId,
recv_stream_id: StreamId,
conn: Connection,
peer_conn: Connection,
}
impl TestEncoder {
pub fn change_capacity(&mut self, capacity: u64) -> Res<()> {
self.encoder.set_max_capacity(capacity).unwrap();
// We will try to really change the table only when we send the change capacity
// instruction.
self.encoder.send_encoder_updates(&mut self.conn)
}
pub fn insert(&mut self, header: &[u8], value: &[u8], inst: &[u8]) {
let res = self.encoder.send_and_insert(&mut self.conn, header, value);
assert!(res.is_ok());
self.send_instructions(inst);
}
pub fn encode_header_block(
&mut self,
stream_id: StreamId,
headers: &[Header],
expected_encoding: &[u8],
inst: &[u8],
) {
let buf = self
.encoder
.encode_header_block(&mut self.conn, headers, stream_id);
assert_eq!(&buf[..], expected_encoding);
self.send_instructions(inst);
}
pub fn send_instructions(&mut self, encoder_instruction: &[u8]) {
self.encoder.send_encoder_updates(&mut self.conn).unwrap();
let out = self.conn.process(None, now());
let out2 = self.peer_conn.process(out.as_dgram_ref(), now());
mem::drop(self.conn.process(out2.as_dgram_ref(), now()));
let mut buf = [0_u8; 100];
let (amount, fin) = self
.peer_conn
.stream_recv(self.send_stream_id, &mut buf)
.unwrap();
assert!(!fin);
assert_eq!(buf[..amount], encoder_instruction[..]);
}
}
fn connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder {
let mut conn = default_client();
let mut peer_conn = max_data.map_or_else(default_server, |max| {
new_server(
DEFAULT_ALPN,
ConnectionParameters::default()
.max_stream_data(StreamType::UniDi, true, max)
.max_stream_data(StreamType::BiDi, true, max)
.max_stream_data(StreamType::BiDi, false, max),
)
});
handshake(&mut conn, &mut peer_conn);
// create a stream
let recv_stream_id = peer_conn.stream_create(StreamType::UniDi).unwrap();
let send_stream_id = conn.stream_create(StreamType::UniDi).unwrap();
// create an encoder
let mut encoder = QPackEncoder::new(
&QpackSettings {
max_table_size_encoder: 1500,
max_table_size_decoder: 0,
max_blocked_streams: 0,
},
huffman,
);
encoder.add_send_stream(send_stream_id);
TestEncoder {
encoder,
send_stream_id,
recv_stream_id,
conn,
peer_conn,
}
}
fn connect(huffman: bool) -> TestEncoder {
connect_generic(huffman, None)
}
fn connect_flow_control(max_data: u64) -> TestEncoder {
connect_generic(true, Some(max_data))
}
fn recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8]) {
encoder
.peer_conn
.stream_send(encoder.recv_stream_id, decoder_instruction)
.unwrap();
let out = encoder.peer_conn.process(None, now());
mem::drop(encoder.conn.process(out.as_dgram_ref(), now()));
assert!(encoder
.encoder
.read_instructions(&mut encoder.conn, encoder.recv_stream_id)
.is_ok());
}
const CAP_INSTRUCTION_200: &[u8] = &[0x02, 0x3f, 0xa9, 0x01];
const CAP_INSTRUCTION_60: &[u8] = &[0x02, 0x3f, 0x1d];
const CAP_INSTRUCTION_1000: &[u8] = &[0x02, 0x3f, 0xc9, 0x07];
const CAP_INSTRUCTION_1500: &[u8] = &[0x02, 0x3f, 0xbd, 0x0b];
const HEADER_CONTENT_LENGTH: &[u8] = &[
0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
];
const VALUE_1: &[u8] = &[0x31, 0x32, 0x33, 0x34];
const VALUE_2: &[u8] = &[0x31, 0x32, 0x33, 0x34, 0x35];
// HEADER_CONTENT_LENGTH and VALUE_1 encoded by instruction insert_with_name_literal.
const HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL: &[u8] = &[
0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
0x04, 0x31, 0x32, 0x33, 0x34,
];
// HEADER_CONTENT_LENGTH and VALUE_2 encoded by instruction insert_with_name_literal.
const HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL: &[u8] = &[
0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
0x05, 0x31, 0x32, 0x33, 0x34, 0x35,
];
// Indexed Header Field that refers to the first entry in the dynamic table.
const ENCODE_INDEXED_REF_DYNAMIC: &[u8] = &[0x02, 0x00, 0x80];
const STREAM_1: StreamId = StreamId::new(1);
const STREAM_2: StreamId = StreamId::new(2);
const HEADER_ACK_STREAM_ID_1: &[u8] = &[0x81];
const HEADER_ACK_STREAM_ID_2: &[u8] = &[0x82];
const STREAM_CANCELED_ID_1: &[u8] = &[0x41];
// test insert_with_name_literal which fails because there is not enough space in the table
#[test]
fn test_insert_with_name_literal_1() {
let mut encoder = connect(false);
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert_eq!(Error::DynamicTableFull, res.unwrap_err());
encoder.send_instructions(&[0x02]);
}
// test insert_with_name_literal - succeeds
#[test]
fn test_insert_with_name_literal_2() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// test the change capacity instruction.
encoder.send_instructions(CAP_INSTRUCTION_200);
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
}
#[test]
fn test_change_capacity() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
encoder.send_instructions(CAP_INSTRUCTION_200);
}
struct TestElement {
pub headers: Vec<Header>,
pub header_block: &'static [u8],
pub encoder_inst: &'static [u8],
}
#[test]
fn test_header_block_encoder_non() {
let test_cases: [TestElement; 6] = [
// test a header with ref to static - encode_indexed
TestElement {
headers: vec![Header::new(":method", "GET")],
header_block: &[0x00, 0x00, 0xd1],
encoder_inst: &[],
},
// test encode_literal_with_name_ref
TestElement {
headers: vec![Header::new(":path", "/somewhere")],
header_block: &[
0x00, 0x00, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65, 0x72,
0x65,
],
encoder_inst: &[],
},
// test adding a new header and encode_post_base_index, also test
// fix_header_block_prefix
TestElement {
headers: vec![Header::new("my-header", "my-value")],
header_block: &[0x02, 0x80, 0x10],
encoder_inst: &[
0x49, 0x6d, 0x79, 0x2d, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x08, 0x6d, 0x79,
0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65,
],
},
// test encode_indexed with a ref to dynamic table.
TestElement {
headers: vec![Header::new("my-header", "my-value")],
header_block: ENCODE_INDEXED_REF_DYNAMIC,
encoder_inst: &[],
},
// test encode_literal_with_name_ref.
TestElement {
headers: vec![Header::new("my-header", "my-value2")],
header_block: &[
0x02, 0x00, 0x40, 0x09, 0x6d, 0x79, 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32,
],
encoder_inst: &[],
},
// test multiple headers
TestElement {
headers: vec![
Header::new(":method", "GET"),
Header::new(":path", "/somewhere"),
Header::new(":authority", "example.com"),
Header::new(":scheme", "https"),
],
header_block: &[
0x00, 0x01, 0xd1, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65,
0x72, 0x65, 0x50, 0x0b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63,
0x6f, 0x6d, 0xd7,
],
encoder_inst: &[],
},
];
let mut encoder = connect(false);
encoder.encoder.set_max_blocked_streams(100).unwrap();
encoder.encoder.set_max_capacity(200).unwrap();
// test the change capacity instruction.
encoder.send_instructions(CAP_INSTRUCTION_200);
for t in &test_cases {
let buf = encoder
.encoder
.encode_header_block(&mut encoder.conn, &t.headers, STREAM_1);
assert_eq!(&buf[..], t.header_block);
encoder.send_instructions(t.encoder_inst);
}
}
#[test]
fn test_header_block_encoder_huffman() {
let test_cases: [TestElement; 6] = [
// test a header with ref to static - encode_indexed
TestElement {
headers: vec![Header::new(":method", "GET")],
header_block: &[0x00, 0x00, 0xd1],
encoder_inst: &[],
},
// test encode_literal_with_name_ref
TestElement {
headers: vec![Header::new(":path", "/somewhere")],
header_block: &[
0x00, 0x00, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85,
],
encoder_inst: &[],
},
// test adding a new header and encode_post_base_index, also test
// fix_header_block_prefix
TestElement {
headers: vec![Header::new("my-header", "my-value")],
header_block: &[0x02, 0x80, 0x10],
encoder_inst: &[
0x67, 0xa7, 0xd2, 0xd3, 0x94, 0x72, 0x16, 0xcf, 0x86, 0xa7, 0xd2, 0xdd, 0xc7,
0x45, 0xa5,
],
},
// test encode_indexed with a ref to dynamic table.
TestElement {
headers: vec![Header::new("my-header", "my-value")],
header_block: ENCODE_INDEXED_REF_DYNAMIC,
encoder_inst: &[],
},
// test encode_literal_with_name_ref.
TestElement {
headers: vec![Header::new("my-header", "my-value2")],
header_block: &[
0x02, 0x00, 0x40, 0x87, 0xa7, 0xd2, 0xdd, 0xc7, 0x45, 0xa5, 0x17,
],
encoder_inst: &[],
},
// test multiple headers
TestElement {
headers: vec![
Header::new(":method", "GET"),
Header::new(":path", "/somewhere"),
Header::new(":authority", "example.com"),
Header::new(":scheme", "https"),
],
header_block: &[
0x00, 0x01, 0xd1, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85, 0x50,
0x88, 0x2f, 0x91, 0xd3, 0x5d, 0x05, 0x5c, 0x87, 0xa7, 0xd7,
],
encoder_inst: &[],
},
];
let mut encoder = connect(true);
encoder.encoder.set_max_blocked_streams(100).unwrap();
encoder.encoder.set_max_capacity(200).unwrap();
// test the change capacity instruction.
encoder.send_instructions(CAP_INSTRUCTION_200);
for t in &test_cases {
let buf = encoder
.encoder
.encode_header_block(&mut encoder.conn, &t.headers, STREAM_1);
assert_eq!(&buf[..], t.header_block);
encoder.send_instructions(t.encoder_inst);
}
}
// Test inserts block on waiting for an insert count increment.
#[test]
fn test_insertion_blocked_on_insert_count_feedback() {
let mut encoder = connect(false);
encoder.encoder.set_max_capacity(60).unwrap();
// test the change capacity instruction.
encoder.send_instructions(CAP_INSTRUCTION_60);
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// insert "content-length: 12345 which will fail because the ntry in the table cannot be
// evicted.
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
assert!(res.is_err());
encoder.send_instructions(&[]);
// receive an insert count increment.
recv_instruction(&mut encoder, &[0x01]);
// insert "content-length: 12345 again it will succeed.
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
}
// Test inserts block on waiting for acks
// test the table insertion is blocked:
// 0 - waiting for a header ack
// 2 - waiting for a stream cancel.
fn test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8) {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(60).is_ok());
// test the change capacity instruction.
encoder.send_instructions(CAP_INSTRUCTION_60);
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// receive an insert count increment.
recv_instruction(&mut encoder, &[0x01]);
// send a header block
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_eq!(&buf[..], ENCODE_INDEXED_REF_DYNAMIC);
encoder.send_instructions(&[]);
// insert "content-length: 12345 which will fail because the entry in the table cannot be
// evicted
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
assert!(res.is_err());
encoder.send_instructions(&[]);
if wait == 0 {
// receive a header_ack.
recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
} else {
// receive a stream canceled
recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
}
// insert "content-length: 12345 again it will succeed.
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
}
#[test]
fn test_header_ack() {
test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(0);
}
#[test]
fn test_stream_canceled() {
test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(1);
}
fn assert_is_index_to_dynamic(buf: &[u8]) {
assert_eq!(buf[2] & 0xc0, 0x80);
}
fn assert_is_index_to_dynamic_post(buf: &[u8]) {
assert_eq!(buf[2] & 0xf0, 0x10);
}
fn assert_is_index_to_static_name_only(buf: &[u8]) {
assert_eq!(buf[2] & 0xf0, 0x50);
}
fn assert_is_literal_value_literal_name(buf: &[u8]) {
assert_eq!(buf[2] & 0xf0, 0x20);
}
#[test]
fn max_block_streams1() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(60).is_ok());
// change capacity to 60.
encoder.send_instructions(CAP_INSTRUCTION_60);
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
encoder.encoder.set_max_blocked_streams(1).unwrap();
// send a header block, it refers to unacked entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
encoder.send_instructions(&[]);
// The next one will not use the dynamic entry because it is exceeding the
// max_blocked_streams limit.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
StreamId::new(2),
);
assert_is_index_to_static_name_only(&buf);
encoder.send_instructions(&[]);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// another header block to already blocked stream can still use the entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
}
#[test]
fn max_block_streams2() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// insert "content-length: 12345
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
encoder.encoder.set_max_blocked_streams(1).unwrap();
// send a header block, it refers to unacked entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
// encode another header block for the same stream that will refer to the second entry
// in the dynamic table.
// This should work because the stream is already a blocked stream
// send a header block, it refers to unacked entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "12345")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
}
#[test]
fn max_block_streams3() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
encoder.encoder.set_max_blocked_streams(1).unwrap();
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
// send a header block, that creates an new entry and refers to it.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// The next one will not create a new entry because the encoder is on max_blocked_streams
// limit.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name2", "value2")],
STREAM_2,
);
assert_is_literal_value_literal_name(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// another header block to already blocked stream can still create a new entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name2", "value2")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
}
#[test]
fn max_block_streams4() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
encoder.encoder.set_max_blocked_streams(1).unwrap();
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
// send a header block, that creates an new entry and refers to it.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// another header block to already blocked stream can still create a new entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name2", "value2")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// receive a header_ack for the first header block.
recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
// The stream is still blocking because the second header block is not acked.
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
}
#[test]
fn max_block_streams5() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
encoder.encoder.set_max_blocked_streams(1).unwrap();
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
// send a header block, that creates an new entry and refers to it.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// another header block to already blocked stream can still create a new entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// receive a header_ack for the first header block.
recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
// The stream is not blocking anymore because header ack also acks the instruction.
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
}
#[test]
fn max_block_streams6() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
encoder.encoder.set_max_blocked_streams(2).unwrap();
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
// send a header block, that creates an new entry and refers to it.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// header block for the next stream will create an new entry as well.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name2", "value2")],
STREAM_2,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
// receive a header_ack for the second header block. This will ack the first as well
recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_2);
// The stream is not blocking anymore because header ack also acks the instruction.
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
}
#[test]
fn max_block_streams7() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
encoder.encoder.set_max_blocked_streams(2).unwrap();
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
// send a header block, that creates an new entry and refers to it.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// header block for the next stream will create an new entry as well.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_2,
);
assert_is_index_to_dynamic(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
// receive a stream cancel for the first stream.
// This will remove the first stream as blocking but it will not mark the instruction as
// acked. and the second steam will still be blocking.
recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
// The stream is not blocking anymore because header ack also acks the instruction.
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
}
#[test]
fn max_block_stream8() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(200).is_ok());
// change capacity to 200.
encoder.send_instructions(CAP_INSTRUCTION_200);
encoder.encoder.set_max_blocked_streams(2).unwrap();
assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
// send a header block, that creates an new entry and refers to it.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
// header block for the next stream will refer to the same entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name1", "value1")],
STREAM_2,
);
assert_is_index_to_dynamic(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
// send another header block on stream 1.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("name2", "value2")],
STREAM_1,
);
assert_is_index_to_dynamic_post(&buf);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
// stream 1 is block on entries 1 and 2; stream 2 is block only on 1.
// receive an Insert Count Increment for the first entry.
// After that only stream 1 will be blocking.
recv_instruction(&mut encoder, &[0x01]);
assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
}
#[test]
fn dynamic_table_can_evict1() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(60).is_ok());
// change capacity to 60.
encoder.send_instructions(CAP_INSTRUCTION_60);
encoder.encoder.set_max_blocked_streams(2).unwrap();
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// send a header block, it refers to unacked entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
// trying to evict the entry will failed.
assert!(encoder.change_capacity(10).is_err());
// receive an Insert Count Increment for the entry.
recv_instruction(&mut encoder, &[0x01]);
// trying to evict the entry will failed. The stream is still referring to it.
assert!(encoder.change_capacity(10).is_err());
// receive a header_ack for the header block.
recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
// now entry can be evicted.
assert!(encoder.change_capacity(10).is_ok());
}
#[test]
fn dynamic_table_can_evict2() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(60).is_ok());
// change capacity to 60.
encoder.send_instructions(CAP_INSTRUCTION_60);
encoder.encoder.set_max_blocked_streams(2).unwrap();
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// send a header block, it refers to unacked entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
// trying to evict the entry will failed.
assert!(encoder.change_capacity(10).is_err());
// receive an Insert Count Increment for the entry.
recv_instruction(&mut encoder, &[0x01]);
// trying to evict the entry will failed. The stream is still referring to it.
assert!(encoder.change_capacity(10).is_err());
// receive a stream cancelled.
recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
// now entry can be evicted.
assert!(encoder.change_capacity(10).is_ok());
}
#[test]
fn dynamic_table_can_evict3() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(60).is_ok());
// change capacity to 60.
encoder.send_instructions(CAP_INSTRUCTION_60);
encoder.encoder.set_max_blocked_streams(2).unwrap();
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// trying to evict the entry will failed, because the entry is not acked.
assert!(encoder.change_capacity(10).is_err());
// receive an Insert Count Increment for the entry.
recv_instruction(&mut encoder, &[0x01]);
// now entry can be evicted.
assert!(encoder.change_capacity(10).is_ok());
}
#[test]
fn dynamic_table_can_evict4() {
let mut encoder = connect(false);
assert!(encoder.encoder.set_max_capacity(60).is_ok());
// change capacity to 60.
encoder.send_instructions(CAP_INSTRUCTION_60);
encoder.encoder.set_max_blocked_streams(2).unwrap();
// insert "content-length: 1234
let res =
encoder
.encoder
.send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
assert!(res.is_ok());
encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
// send a header block, it refers to unacked entry.
let buf = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[Header::new("content-length", "1234")],
STREAM_1,
);
assert_is_index_to_dynamic(&buf);
// trying to evict the entry will failed. The stream is still referring to it and
// entry is not acked.
assert!(encoder.change_capacity(10).is_err());
// receive a header_ack for the header block. This will also ack the instruction.
recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
// now entry can be evicted.
assert!(encoder.change_capacity(10).is_ok());
}
#[test]
fn encoder_flow_controlled_blocked() {
const SMALL_MAX_DATA: u64 = 20;
const ONE_INSTRUCTION_1: &[u8] = &[
0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x7f, 0x83, 0x8, 0x99, 0x6b,
];
const ONE_INSTRUCTION_2: &[u8] = &[
0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x37, 0x83, 0x8, 0x99, 0x6b,
];
let mut encoder = connect_flow_control(SMALL_MAX_DATA);
// change capacity to 1000 and max_block streams to 20.
encoder.encoder.set_max_blocked_streams(20).unwrap();
assert!(encoder.encoder.set_max_capacity(1000).is_ok());
encoder.send_instructions(CAP_INSTRUCTION_1000);
// Encode a header block with 2 headers. The first header will be added to the dynamic
// table. The second will not be added to the dynamic table, because the
// corresponding instruction cannot be written immediately due to the flow control
// limit.
let buf1 = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[
Header::new("something", "1234"),
Header::new("something2", "12345678910"),
],
STREAM_1,
);
// Assert that the first header is encoded as an index to the dynamic table (a post form).
assert_eq!(buf1[2], 0x10);
// Assert that the second header is encoded as a literal with a name literal
assert_eq!(buf1[3] & 0xf0, 0x20);
// Try to encode another header block. Here both headers will be encoded as a literal with a
// name literal
let buf2 = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[
Header::new("something3", "1234"),
Header::new("something4", "12345678910"),
],
STREAM_2,
);
assert_eq!(buf2[2] & 0xf0, 0x20);
// Ensure that we have sent only one instruction for (String::from("something", "1234"))
encoder.send_instructions(ONE_INSTRUCTION_1);
// exchange a flow control update.
let out = encoder.peer_conn.process(None, now());
mem::drop(encoder.conn.process(out.as_dgram_ref(), now()));
// Try writing a new header block. Now, headers will be added to the dynamic table again,
// because instructions can be sent.
let buf3 = encoder.encoder.encode_header_block(
&mut encoder.conn,
&[
Header::new("something5", "1234"),
Header::new("something6", "12345678910"),
],
StreamId::new(3),
);
// Assert that the first header is encoded as an index to the dynamic table (a post form).
assert_eq!(buf3[2], 0x10);
// Assert that the second header is encoded as a literal with a name literal
assert_eq!(buf3[3] & 0xf0, 0x20);
// Asset that one instruction has been sent
encoder.send_instructions(ONE_INSTRUCTION_2);
}
#[test]
fn encoder_max_capacity_limit() {
let mut encoder = connect(false);
// change capacity to 2000.
assert!(encoder.encoder.set_max_capacity(2000).is_ok());
encoder.send_instructions(CAP_INSTRUCTION_1500);
}
#[test]
fn test_do_not_evict_entry_that_are_referred_only_by_the_same_header_blocked_encoding() {
let mut encoder = connect(false);
encoder.encoder.set_max_blocked_streams(20).unwrap();
assert!(encoder.change_capacity(50).is_ok());
encoder
.encoder
.send_and_insert(&mut encoder.conn, b"something5", b"1234")
.unwrap();
encoder
.encoder
.send_encoder_updates(&mut encoder.conn)
.unwrap();
let out = encoder.conn.process(None, now());
mem::drop(encoder.peer_conn.process(out.as_dgram_ref(), now()));
// receive an insert count increment.
recv_instruction(&mut encoder, &[0x01]);
// The first header will use the table entry and the second will use the literal
// encoding because the first entry is referred to and cannot be evicted.
assert_eq!(
encoder
.encoder
.encode_header_block(
&mut encoder.conn,
&[
Header::new("something5", "1234"),
Header::new("something6", "1234"),
],
StreamId::new(3),
)
.to_vec(),
&[
0x02, 0x00, 0x80, 0x27, 0x03, 0x73, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x69, 0x6e, 0x67,
0x36, 0x04, 0x31, 0x32, 0x33, 0x34
]
);
// Also check that ther is no new instruction send by the encoder.
assert!(encoder.conn.process_output(now()).dgram().is_none());
}
#[test]
fn test_streams_cancel_cleans_up_unacked_header_blocks() {
let mut encoder = connect(false);
encoder.encoder.set_max_blocked_streams(10).unwrap();
assert!(encoder.change_capacity(60).is_ok());
encoder.send_instructions(CAP_INSTRUCTION_60);
// insert "content-length: 1234
encoder.insert(
HEADER_CONTENT_LENGTH,
VALUE_1,
HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL,
);
// send a header block
encoder.encode_header_block(
StreamId::new(1),
&[Header::new("content-length", "1234")],
ENCODE_INDEXED_REF_DYNAMIC,
&[],
);
// receive a stream canceled instruction.
recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
recv_instruction(&mut encoder, &[0x01]);
}
}