Skip to content

Commit

Permalink
add fpca using h space and MFPCA
Browse files Browse the repository at this point in the history
  • Loading branch information
jdtuck committed Apr 9, 2024
1 parent e5ae269 commit 3519d64
Showing 1 changed file with 317 additions and 0 deletions.
317 changes: 317 additions & 0 deletions fdasrsf/fPCA.py
Expand Up @@ -696,6 +696,246 @@ def plot(self):
return


class fdajpcah:
"""
This class provides joint fPCA using the
SRVF framework using MFPCA
Usage: obj = fdajpcah(warp_data)
:param warp_data: fdawarp class with alignment data
:param q_pca: srvf principal directions
:param f_pca: f principal directions
:param latent: latent values
:param coef: principal coefficients
:param id: point used for f(0)
:param mqn: mean srvf
:param U_q: eigenvectors for q
:param U_h: eigenvectors for gam
:param C: scaling value
:param stds: geodesic directions
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 06-April-2024
"""

def __init__(self, fdawarp):
"""
Construct an instance of the fdavpca class
:param fdawarp: fdawarp class
"""
if fdawarp.fn.size == 0:
raise Exception("Please align fdawarp class using srsf_align!")

self.warp_data = fdawarp
self.M = fdawarp.time.shape[0]

def calc_fpca(
self,
var_exp=0.99,
stds=np.arange(-1.0, 2.0),
id=None,
parallel=False,
cores=-1,
):
"""
This function calculates joint functional principal component analysis
on aligned data
:param var_exp: compute no based on value percent variance explained
(default: 0.99)
:param id: point to use for f(0) (default = midpoint)
:param stds: number of standard deviations along gedoesic to compute
(default = -1,0,1)
:param parallel: run in parallel (default = F)
:param cores: number of cores for parallel (default = -1 (all))
:type no: int
:type id: int
:type parallel: bool
:type cores: int
:rtype: fdajpcah object of numpy ndarray
:return q_pca: srsf principal directions
:return f_pca: functional principal directions
:return latent: latent values
:return coef: coefficients
:param U_q: eigenvectors for q
:param U_h: eigenvectors for gam
"""
fn = self.warp_data.fn
time = self.warp_data.time
qn = self.warp_data.qn
q0 = self.warp_data.q0
gam = self.warp_data.gam

M = time.shape[0]
if var_exp is not None:
if var_exp > 1:
raise Exception("var_exp is greater than 1")

if id is None:
mididx = int(np.round(M / 2))
else:
mididx = id

Nstd = stds.shape[0]

# set up for fPCA in q-space
mq_new = qn.mean(axis=1)
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn = np.append(mq_new, m_new.mean())
qn2 = np.vstack((qn, m_new))

# calculate vector space of warping functions
h = geo.gam_to_h(gam)

# joint fPCA
C = fminbound(find_C_h, 0, 200, (qn2, h, q0, 0.99, parallel, cores))
qhat, gamhat, cz, Psi_q, Psi_h, sz, U, Uh, Uz = jointfPCAhd(qn2, h, C, var_exp)

hc = C * h
mh = np.mean(hc, axis=1)

# geodesic paths
no = cz.shape[1]
q_pca = np.ndarray(shape=(M, Nstd, no), dtype=float)
f_pca = np.ndarray(shape=(M, Nstd, no), dtype=float)

for k in range(0, no):
for l in range(0, Nstd):
qhat = mqn + np.dot(Psi_q[:, k], stds[l] * np.sqrt(sz[k]))
hhat = np.dot(Psi_h[:, k], (stds[l] * np.sqrt(sz[k])) / C)
gamhat = fs.geometry.h_to_gam(hhat)

fhat = fs.utility_functions.cumtrapzmid(
time,
qhat[0:M] * np.fabs(qhat[0:M]),
np.sign(qhat[M]) * (qhat[M] * qhat[M]),
mididx,
)
f_pca[:, l, k] = fs.utility_functions.warp_f_gamma(
np.linspace(0, 1, M), fhat, gamhat
)
q_pca[:, l, k] = fs.utility_functions.warp_q_gamma(
np.linspace(0, 1, M), qhat[0:M], gamhat
)

self.q_pca = q_pca
self.f_pca = f_pca
self.latent = sz[0:no]
self.coef = cz[:, 0:no]
self.U_q = Psi_q
self.U_h = Psi_h
self.id = mididx
self.C = C
self.time = time
self.no = no
self.stds = stds
self.mqn = mqn
self.U = U
self.U1 = Uh
self.Uz = Uz
self.mh = mh

return

def project(self, f):
"""
project new data onto fPCA basis
Usage: obj.project(f)
:param f: numpy array (MxN) of N functions on M time
"""

q1 = fs.f_to_srsf(f, self.time)
M = self.time.shape[0]
n = q1.shape[1]
mq = self.warp_data.mqn
fn = np.zeros((M, n))
qn = np.zeros((M, n))
gam = np.zeros((M, n))
for ii in range(0, n):
gam[:, ii] = fs.optimum_reparam(mq, self.time, q1[:, ii])
fn[:, ii] = fs.warp_f_gamma(self.time, f[:, ii], gam[:, ii])
qn[:, ii] = fs.f_to_srsf(fn[:, ii], self.time)

m_new = np.sign(fn[self.id, :]) * np.sqrt(np.abs(fn[self.id, :]))
qn1 = np.vstack((qn, m_new))
C = self.C

h = geo.gam_to_h(gam)

c = (qn1 - self.mqn[:, np.newaxis]).T @ self.U
ch = (C*h - self.mh[:, np.newaxis]).T @ self.U1

Xi = np.hstack((c, ch))

cz = Xi @ self.Uz

self.new_coef = cz

return

def plot(self):
"""
plot plot elastic joint fPCA result
Usage: obj.plot()
"""
no = self.no
M = self.time.shape[0]
Nstd = self.stds.shape[0]
num_plot = int(np.ceil(no / 3))
CBcdict = {
"Bl": (0, 0, 0),
"Or": (0.9, 0.6, 0),
"SB": (0.35, 0.7, 0.9),
"bG": (0, 0.6, 0.5),
"Ye": (0.95, 0.9, 0.25),
"Bu": (0, 0.45, 0.7),
"Ve": (0.8, 0.4, 0),
"rP": (0.8, 0.6, 0.7),
}
cl = sorted(CBcdict.keys())
k = 0
for ii in range(0, num_plot):
if k > (no - 1):
break

fig, ax = plt.subplots(2, 3)

for k1 in range(0, 3):
k = k1 + (ii) * 3
axt = ax[0, k1]
if k > (no - 1):
break

for l in range(0, Nstd):
axt.plot(self.time, self.q_pca[0:M, l, k], color=CBcdict[cl[l]])

axt.set_title("q domain: PD %d" % (k + 1))

axt = ax[1, k1]
for l in range(0, Nstd):
axt.plot(self.time, self.f_pca[:, l, k], color=CBcdict[cl[l]])

axt.set_title("f domain: PD %d" % (k + 1))

fig.set_tight_layout(True)

cumm_coef = 100 * np.cumsum(self.latent) / sum(self.latent)
idx = np.arange(0, self.latent.shape[0]) + 1
plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage")
plt.ylabel("Percentage")
plt.xlabel("Index")
plt.show()

return


def jointfPCAd(qn, vec, C, m, mu_psi, parallel, cores):
(M, N) = qn.shape
g = np.vstack((qn, C * vec))
Expand Down Expand Up @@ -739,6 +979,83 @@ def jointfPCAd(qn, vec, C, m, mu_psi, parallel, cores):
return qhat, gamhat, a, U, s, mu_g, g, K


def jointfPCAhd(qn, h, C, var_exp):
(M, N) = qn.shape

# Run Univariate fPCA
# q space
K = np.cov(qn)

mqn = qn.mean(axis=1)

U, s, V = svd(K)
U, V = uf.svd_flip(U, V)

cumm_coef = np.cumsum(s) / s.sum()
no_q = int(np.argwhere(cumm_coef >= var_exp)[0][0])

c = (qn - mqn[:, np.newaxis]).T @ U
c = c[:, 0:no_q]
U = U[:, 0:no_q]

# h space
hc = C * h
mh = np.mean(hc, axis=1)
Kh = np.cov(hc)

Uh, sh, Vh = svd(Kh)
Uh, Vh = uf.svd_flip(Uh, Vh)

cumm_coef = np.cumsum(sh) / sh.sum()
no_h = int(np.argwhere(cumm_coef >= var_exp)[0][0]) + 1

ch = (hc - mh[:, np.newaxis]).T @ Uh
ch = ch[:, 0:no_h]
Uh = Uh[:, 0:no_h]

# Run Multivariate fPCA
Xi = np.hstack((c, ch))
Z = 1 / (Xi.shape[0] - 1) * Xi.T @ Xi

Uz, sz, Vz = svd(Z)
Uz, Vz = uf.svd_flip(Uz, Vz)

cz = Xi @ Uz

Psi_q = U @ Uz[0:no_q, :]
Psi_h = Uh @ Uz[no_q:, :]

hhat = Psi_h @ (cz).T
gamhat = fs.geometry.h_to_gam(hhat)

qhat = Psi_q @ cz.T + mqn[:, np.newaxis]

return qhat, gamhat, cz, Psi_q, Psi_h, sz, U, Uh, Uz


def find_C_h(C, qn, h, q0, var_exp, parallel, cores):
qhat, gamhat, cz, Psi_q, Psi_h, sz, U, Uh, Uz = jointfPCAhd(qn, h, C, var_exp)
(M, N) = qn.shape
time = np.linspace(0, 1, M - 1)

d = np.zeros(N)
if parallel:
out = Parallel(n_jobs=cores)(
delayed(find_C_sub)(time, qhat[0: (M - 1), n], gamhat[:, n], q0[:, n])
for n in range(N)
)
d = np.array(out)
else:
for i in range(0, N):
tmp = uf.warp_q_gamma(time, qhat[0: (M - 1), i],
uf.invertGamma(gamhat[:, i]))
d[i] = trapezoid((tmp - q0[:, i]) * (tmp - q0[:, i]), time)

dout = d.mean()

return dout


def jfpca_sub(mu_psi, vechat):
M = mu_psi.shape[0]
psihat = geo.exp_map(mu_psi, vechat)
Expand Down

0 comments on commit 3519d64

Please sign in to comment.