连接失败的嗅探

938 发布于: 2021-03-25 读完约需 1 分钟

默认情况下,当使用允许重播的连接池时,将启用嗅探连接,唯一允许这样做的连接池是嗅探连接池(Sniffing connection pool)

通过询问Elasticsearch集群本身来强制刷新连接池已知的健康节点非常方便,嗅探器通过询问它当前知道的每个节点来获取节点,直到有一个节点作出响应。

以下示例中,在端口9200-9204上开启了5个已知节点,我们认为其中9202,9203,9204是符合主节点条件的节点。当在9201端口上进行搜索时,会抛出引发嗅探的Once失败:

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(5)
    .MasterEligible(9202, 9203, 9204)
    .ClientCalls(r => r.SucceedAlways())
    .ClientCalls(r => r.OnPort(9201).Fails(Once)) 
    .Sniff(p => p.SucceedAlways(VirtualClusterWith
        .Nodes(3)
        .MasterEligible(9200, 9202)
        .ClientCalls(r => r.OnPort(9201).Fails(Once))
        .ClientCalls(r => r.SucceedAlways())
        .Sniff(s => s.SucceedAlways(VirtualClusterWith 
            .Nodes(3, 9210)
            .MasterEligible(9210, 9212)
            .ClientCalls(r => r.SucceedAlways())
            .Sniff(r => r.SucceedAlways())
        ))
    ))
    .SniffingConnectionPool()
    .Settings(s => s.DisablePing().SniffOnStartup(false))
);

audit = await audit.TraceCalls(
/** */
    new ClientCall {
        { HealthyResponse, 9200 },
        { pool =>  pool.Nodes.Count.Should().Be(5) }
    },
    new ClientCall {
        { BadResponse, 9201},
        { SniffOnFail },
        { SniffSuccess, 9202}, 
        { HealthyResponse, 9200},
        { pool =>  pool.Nodes.Count.Should().Be(3) } 
    },
    new ClientCall {
        { BadResponse, 9201},
        { SniffOnFail }, 
        { SniffSuccess, 9200},
        { HealthyResponse, 9210},
        { pool =>  pool.Nodes.Count.Should().Be(3) }
    },
    new ClientCall { { HealthyResponse, 9211 } },
    new ClientCall { { HealthyResponse, 9212 } },
    new ClientCall { { HealthyResponse, 9210 } },
    new ClientCall { { HealthyResponse, 9211 } },
    new ClientCall { { HealthyResponse, 9212 } },
    new ClientCall { { HealthyResponse, 9210 } },
    new ClientCall { { HealthyResponse, 9211 } },
    new ClientCall { { HealthyResponse, 9212 } },
    new ClientCall { { HealthyResponse, 9210 } }
);

在Ping失败后嗅探(Sniffing after ping failure)

这里对测试集群的设置与前面的设置完全相同,只是启用了Ping(默认为true)并使Ping失败:

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(5)
    .MasterEligible(9202, 9203, 9204)
    .Ping(r => r.OnPort(9201).Fails(Once))
    .Ping(r => r.SucceedAlways())
    .ClientCalls(c=>c.SucceedAlways())
    .Sniff(p => p.SucceedAlways(VirtualClusterWith
        .Nodes(3)
        .MasterEligible(9200, 9202)
        .Ping(r => r.OnPort(9201).Fails(Once))
        .Ping(r => r.SucceedAlways())
        .ClientCalls(c=>c.SucceedAlways())
        .Sniff(s => s.SucceedAlways(VirtualClusterWith
            .Nodes(3, 9210)
            .MasterEligible(9210, 9211)
            .Ping(r => r.SucceedAlways())
            .Sniff(r => r.SucceedAlways())
            .ClientCalls(c=>c.SucceedAlways())
        ))
    ))
    .SniffingConnectionPool()
    .Settings(s => s.SniffOnStartup(false))
);

audit = await audit.TraceCalls(
    new ClientCall {
        { PingSuccess, 9200 },
        { HealthyResponse, 9200 },
        { pool =>  pool.Nodes.Count.Should().Be(5) }
    },
    new ClientCall {
        { PingFailure, 9201},
        { SniffOnFail }, // 断言对第一个已知的主节点9202进行嗅探
        { SniffSuccess, 9202},
        { PingSuccess, 9200},
        { HealthyResponse, 9200},
        { pool =>  pool.Nodes.Count.Should().Be(3) } // 连接池现在应该有三个节点
    },
    new ClientCall {
        { PingFailure, 9201},
        { SniffOnFail }, // 断言对更新后的集群中的第一个主节点进行嗅探
        { SniffSuccess, 9200},
        { PingSuccess, 9210},
        { HealthyResponse, 9210},
        { pool =>  pool.Nodes.Count.Should().Be(3) }
    },
    new ClientCall {
        { PingSuccess, 9211 },
        { HealthyResponse, 9211 }
    },
    new ClientCall {
        { PingSuccess, 9212 },
        { HealthyResponse, 9212 }
    },
    new ClientCall { { HealthyResponse, 9210 } }, // 嗅探器返回新节点后,已经ping通了9210
    new ClientCall { { HealthyResponse, 9211 } },
    new ClientCall { { HealthyResponse, 9212 } },
    new ClientCall { { HealthyResponse, 9210 } }
);

客户端发布节点地址

var audit = new Auditor(() => VirtualClusterWith
        .Nodes(2)
        .MasterEligible(9200)
        .ClientCalls(c=>c.SucceedAlways())
        .Ping(r => r.OnPort(9200).Fails(Once))
        .Ping(c=>c.SucceedAlways())
        .Sniff(p => p.SucceedAlways(VirtualClusterWith
            .Nodes(10)
            .MasterEligible(9200, 9202, 9201)
            .PublishAddress("10.0.12.1")
            .ClientCalls(c=>c.SucceedAlways())
            .Ping(c=>c.SucceedAlways())
        ))
        .SniffingConnectionPool()
        .Settings(s => s.SniffOnStartup(false))
);

void HostAssert(Audit a, string host, int expectedPort)
{
    a.Node.Uri.Host.Should().Be(host);
    a.Node.Uri.Port.Should().Be(expectedPort);
}
void SniffUrlAssert(Audit a, string host, int expectedPort)
{
    HostAssert(a, host, expectedPort);
    var sniffUri = new UriBuilder(a.Node.Uri)
    {
        Path = RequestPipeline.SniffPath,
        Query = "flat_settings=true&timeout=2s"
    }.Uri;
    sniffUri.PathEquals(a.Path, nameof(SniffUrlAssert));
}

audit = await audit.TraceCalls(
    new ClientCall {
        { PingFailure, a => HostAssert(a, "localhost", 9200)},
        { SniffOnFail },
        { SniffSuccess, a => SniffUrlAssert(a, "localhost", 9200)},
        { PingSuccess, a => HostAssert(a, "10.0.12.1", 9200)},
        { HealthyResponse,  a => HostAssert(a, "10.0.12.1", 9200)},
        { pool =>  pool.Nodes.Count.Should().Be(10) } // 连接池现在应该有10个节点
    }
);

版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。

发表评论

登录用户才能发表评论, 请 登 录 或者 注册