Now that we have all the necessary datastructures we can build the router itself. Most of the routing table logic is handled by the bit_tree and bucket modules. The router just ties these together and handles I/O.
Before actually running the routing table the router has to find out its own address, as it is seen from the outside world. It does this by sending +end signals to a list of known telehash nodes (eg telehash.org:42424).
record(bootstrap, { % the state of the router when bootstrapping
timeout, % give up if no address received before this time
addresses % list of addresses contacted to find out our address
}).
bootstrap(Addresses, Timeout) ->
?INFO([bootstrapping]),
State = #bootstrap{timeout=Timeout, addresses=Addresses},
{ok, _Pid} = gen_server:start_link(?MODULE, State, []).
init(State) ->
switch:listen(),
case State of
#bootstrap{timeout=Timeout, addresses=Addresses} ->
Telex = telex:end_signal(util:random_end()),
lists:foreach(fun (Address) -> switch:send(Address, Telex) end, Addresses),
erlang:send_after(Timeout, self(), giveup);
#state{} ->
ok
end,
{ok, State}.
Then we listen until we either get a reply with a _to field or run out of time.
handle_info({switch, {recv, From, Telex}}, #bootstrap{addresses=Addresses}=Bootstrap) ->
% bootstrapping, waiting to receive a message telling us our own address
case {lists:member(From, Addresses), telex:get(Telex, '_to')} of
{true, {ok, Binary}} ->
try util:to_end(util:binary_to_address(Binary)) of
End ->
Self = util:to_bits(End),
Table = touched(From, Self, empty_table(Self)),
dialer:dial(End, [From], ?ROUTER_DIAL_TIMEOUT),
refresh(Self, Table),
?INFO([bootstrap, finished, {self, Binary}, {from, From}]),
{noreply, #state{self=Self, pinged=sets:new(), table=Table}}
catch
_ ->
?WARN([bootstrap, bad_self, {self, Binary}, {from, From}]),
{noreply, Bootstrap}
end;
_ ->
{noreply, Bootstrap}
end;
handle_info(giveup, #bootstrap{}=Bootstrap) ->
% failed to bootstrap, die
?INFO([giveup, {state, Bootstrap}]),
{stop, {shutdown, gaveup}, Bootstrap};
Once we know our own address we can fill in the state record and start managing the routing table.
-record(state, { % the state of the router in normal operation
self, % the bits of the routers own end
pinged, % set of addresses which have been pinged and not yet replied/timedout
table % the routing table, a bit_tree containing buckets of nodes
}).
One of the jobs of the router is to remove unresponsive nodes from the routing table. To check if a node is responsive we just a random +end signal and wait for a reply. If the node is unresponsive it gets marked as stale and we try to find a suitable replacement. The node won't actually be dropped from the table until a replacement is found - this prevents the table from getting flushed if our network connection goes down.
ping(To) ->
Telex = telex:end_signal(util:random_end()),
% do this in a message to self to avoid some awkward control flow
self() ! {pinging, To},
switch:send(To, Telex),
erlang:send_after(?ROUTER_PING_TIMEOUT, self(), {timeout, Address}).
timedout(Address, Self, Table) ->
bit_tree:update(
fun (_Suffix, _Depth, _Gap, Bucket) ->
case bucket:timedout(Address, Bucket) of
{node, Node, Update} ->
% try to touch this node, might be suitable replacement
ping(Node),
Update;
Update ->
Update
end
end,
util:to_bits(Address),
Self,
Table
).
handle_info({pinging, Address}, #state{pinged=Pinged}=State) ->
% do this in a message to self to avoid some awkward control flow
?INFO([recording_ping, {address, Address}]),
Pinged2 = sets:add_element(Address, Pinged),
{noreply, State#state{pinged=Pinged2}};
handle_info({timeout, Address}, #state{self=Self, pinged=Pinged, table=Table}=State) ->
case lists:member(Address, Pinged) of
true ->
% ping timedout
?INFO([timeout, {address, Address}]),
Table2 = timedout(Address, Self, Table),
{ok, State#state{table=Table2}};
false ->
% address already replied
{ok, State}
end;
One of the rules of the router is that it should never pass on information about a node that it hasn't personally confirmed to exist. Once we receive a message from a node we know that it exists (later we will implement _ring/_line to protect against address spoofing):
touched(Address, Self, Table) ->
bit_tree:update(
fun (Suffix, _Depth, Gap, Bucket) ->
May_split = (Gap < ?K), % !!! or (Depth < ?ROUTER_TABLE_EXPANSION)
bucket:touched(Address, Suffix, now(), Bucket, May_split)
end,
util:to_bits(Address),
Self,
Table
).
On receiving a .see command we record all the contained addresses as potential nodes and ping them to try to confirm their existence.
seen(Address, Self, Table) ->
bit_tree:update(
fun (Suffix, _Depth, _Gap, Bucket) ->
case bucket:seen(Address, Suffix, now(), Bucket) of
{node, Node, Update} ->
% check if this node is stale
ping(Node),
Update;
Update ->
Update
end
end,
util:to_bits(Address),
Self,
Table
).
On receiving a +end signal we reply with a .see command containing the nearest K addresses which we have confirmed to exist.
see(To, End, Table) ->
Telex = telex:see_command(nearest(?K, End, Table)),
switch:send(To, Telex).
nearest(N, End, Table) when N>=0 ->
Bits = util:to_bits(End),
iter:take(
N,
iter:flatten(
iter:map(
fun ({_Prefix, Bucket}) -> bucket:by_dist(End, Bucket) end,
bit_tree:iter(Bits, Table)))).
On receiving a message we have handle the above three cases, which gets a little ugly.
handle_info({switch, {recv, From, Telex}}, #state{self=Self, pinged=Pinged, table=Table}=State) ->
% this counts as a reply
Pinged2 = sets:del_element(From, Pinged),
% touched the sender
% !!! eventually will check _line here
?INFO([touched, {node, From}]),
Table2 = touched(From, Self, Table),
% maybe seen some nodes
Table3 =
case telex:get(Telex, '.see') of
{ok, Binaries} ->
try [util:binary_to_address(Bin) || Bin <- Binaries] of
Addresses ->
?INFO([seen, {nodes, Addresses}, {from, From}]),
lists:foldl(fun (Address, Table_acc) -> seen(Address, Self, Table_acc) end, Table2, Addresses)
catch
_ ->
?INFO([bad_seen, {nodes, Binaries}, {from, From}]),
Table2
end;
_ ->
Table2
end,
% maybe send some nodes back
case telex:get(Telex, '+end') of
{ok, Hex} ->
try util:hex_to_end(Hex) of
End ->
?INFO([see, {'end', End}, {from, From}]),
see(From, End, Table3)
catch
_ ->
?WARN([bad_see, {'end', Hex}, {from, From}])
end;
_ ->
ok
end,
{noreply, State#state{pinged=Pinged2, table=Table2}};
The last responsibility of the router is to periodically refresh buckets which haven't recently seen any activity.
handle_info(refresh, #state{self=Self, table=Table}=State) ->
?INFO([refreshing_table]),
refresh(Self, Table),
{noreply, State};
handle_info({dialed, _, _}, State) ->
% response from a bucket refresh, we don't care
{noreply, State};
dialed(Address, Self, Table) ->
bit_tree:update(
fun (_Suffix, _Depth, _Gap, Bucket) ->
bucket:dialed(now(), Bucket)
end,
util:to_bits(Address),
Self,
Table
).
needs_refresh(Bucket, Now) ->
case bucket:last_dialed(Bucket) of
never ->
true;
Last ->
(timer:now_diff(Now, Last) div 1000) < ?ROUTER_REFRESH_TIME
end.
refresh(Self, Table) ->
Now = now(),
iter:foreach(
fun ({Prefix, Bucket}) ->
case needs_refresh(Bucket, Now) of
true ->
?INFO([refreshing_bucket, {prefix, Prefix}, {bucket, Bucket}]),
To = util:random_end(Prefix),
From = nearest(?K, To, Table),
dialer:dial(To, From, ?ROUTER_DIAL_TIMEOUT);
false ->
ok
end
end,
bit_tree:iter(Self, Table)
),
erlang:send_after(?ROUTER_REFRESH_TIME, self(), refresh),
ok.
That's it. As usual the (untested) code is in the repo. The next post will probably deal with taps.