14. Source code#
Here you can find some useful functions used in the next notebook. You should copy the code down below and and save it in a file named Tnet.py
inside the folder src
.
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, diags
def ComputeReachabilityVector(df, T):
'''Given a temporal network in the form ijt contained in the dataframe df, this function computes the reachability of time-respecting walks of length T
Use: reachability = ComputeReachabilityVector(df, T)
Inputs:
* df (pandas dataframe): temporal network in the form ijt
* T (int): number of steps in the walk
Outputs:
* reachability (array): returns the T reachability values for each time in [1,T]
'''
# create a mapping between the nodes and integers
all_nodes = np.unique(df[['i', 'j']])
n = len(all_nodes)
# shift the smallest time-stamp to 0
df.t = df.t - df.t.min()
# split the edge list according to the time stamp
dft = [df[df.t == t] for t in range(T)]
# create a list of snapshot adjacency matrices in which each node is connected to itself
At = [diags(np.ones(n)) for t in range(T)]
# compute the snapshot adjacency matrices
for t in range(T):
if len(dft[t]) > 0:
A = csr_matrix((np.ones(len(dft[t])), (dft[t].i, dft[t].j)), shape = (n,n))
At[t] += A + A.T
# compute the reachability vector given the adjacency matrices list
reachability = ReachabilityFromAdjacencyList(At)
return reachability
def ReachabilityFromAdjacencyList(At):
'''This function computes the reachability of time respecting paths given a sequence of adjacency matrices with self loops
Use: reachability = ReachabilityFromAdjacencyList(At)
Inputs:
* At (list of sparse arrays): the entries of this list are the adjacency matrices of each snapshot
Outputs:
* reachability (array): returns the T reachability values for each time in [1,T], where T is the number of time frames
'''
# compute the reachability matrix R_{ij}(t) = 1 if j can be reached from i in t steps or fewer.
R = [At[0]]
T = len(At)
for t in range(1, T):
print(f'Progress: {int(100*t/T)}%', end = '\r')
R.append((At[t]@R[-1]).sign())
# compute the average of the reachability matrix
reachability = np.array([r.mean() for r in R])
return reachability
def ComputeIntereventStatistic(df):
'''Given a contact network in the format ijtτ, this function computes the time elapsed between the end of an
interaction between two nodes (ij) and the beginning of a new one between the same nodes.
This is repeated for all nodes
Use: intervals = ComputeIntereventStatistic(df)
Inputs:
* df (pandas dataframe): temporal network in the format ijtτ
Outpus:
* intervals (array): list of interevent values for all pairs'''
# index the dataframe by the contact indeces
df.set_index(['i', 'j'], inplace = True)
all_pairs = list(df.index)
intervals = []
# for each pair, select only the entries of df involving that pair
for i, pair in enumerate(all_pairs):
print(f'Progress: {int(i/len(all_pairs)*100)}%', end = '\r')
ddf = df.loc[pair]
# compute and store the interevent duration
if len(ddf) > 1:
intervals.append((ddf.t + ddf.τ - np.roll(ddf.t, 1))[1:].values)
return np.concatenate(intervals)